diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
index 740f12e7d13d..bf59152c8c0c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
@@ -201,7 +201,7 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP
val stages = jobData.stageIds.map { stageId =>
// This could be empty if the listener hasn't received information about the
// stage or if the stage information has been garbage collected
- store.stageData(stageId).lastOption.getOrElse {
+ store.asOption(store.lastStageAttempt(stageId)).getOrElse {
new v1.StageData(
v1.StageStatus.PENDING,
stageId,
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 99eab1b2a27d..ff1b75e5c506 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -34,10 +34,10 @@ private[ui] class JobsTab(parent: SparkUI, store: AppStatusStore)
val killEnabled = parent.killEnabled
def isFairScheduler: Boolean = {
- store.environmentInfo().sparkProperties.toMap
- .get("spark.scheduler.mode")
- .map { mode => mode == SchedulingMode.FAIR }
- .getOrElse(false)
+ store
+ .environmentInfo()
+ .sparkProperties
+ .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
}
def getSparkUser: String = parent.getSparkUser
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 11a6a3434497..af78373ddb4b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -19,6 +19,7 @@ package org.apache.spark.ui.jobs
import java.net.URLEncoder
import java.util.Date
+import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.{HashMap, HashSet}
@@ -29,15 +30,14 @@ import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.SparkConf
import org.apache.spark.internal.config._
import org.apache.spark.scheduler.TaskLocality
-import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status._
import org.apache.spark.status.api.v1._
import org.apache.spark.ui._
-import org.apache.spark.util.{Distribution, Utils}
+import org.apache.spark.util.Utils
/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends WebUIPage("stage") {
import ApiHelper._
- import StagePage._
private val TIMELINE_LEGEND = {
@@ -67,17 +67,17 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
// if we find that it's okay.
private val MAX_TIMELINE_TASKS = parent.conf.getInt("spark.ui.timeline.tasks.maximum", 1000)
- private def getLocalitySummaryString(stageData: StageData, taskList: Seq[TaskData]): String = {
- val localities = taskList.map(_.taskLocality)
- val localityCounts = localities.groupBy(identity).mapValues(_.size)
+ private def getLocalitySummaryString(localitySummary: Map[String, Long]): String = {
val names = Map(
TaskLocality.PROCESS_LOCAL.toString() -> "Process local",
TaskLocality.NODE_LOCAL.toString() -> "Node local",
TaskLocality.RACK_LOCAL.toString() -> "Rack local",
TaskLocality.ANY.toString() -> "Any")
- val localityNamesAndCounts = localityCounts.toSeq.map { case (locality, count) =>
- s"${names(locality)}: $count"
- }
+ val localityNamesAndCounts = names.flatMap { case (key, name) =>
+ localitySummary.get(key).map { count =>
+ s"$name: $count"
+ }
+ }.toSeq
localityNamesAndCounts.sorted.mkString("; ")
}
@@ -108,7 +108,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val stageHeader = s"Details for Stage $stageId (Attempt $stageAttemptId)"
val stageData = parent.store
- .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = true))
+ .asOption(parent.store.stageAttempt(stageId, stageAttemptId, details = false))
.getOrElse {
val content =
@@ -117,8 +117,11 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
return UIUtils.headerSparkPage(stageHeader, content, parent)
}
- val tasks = stageData.tasks.getOrElse(Map.empty).values.toSeq
- if (tasks.isEmpty) {
+ val localitySummary = store.localitySummary(stageData.stageId, stageData.attemptId)
+
+ val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
+ stageData.numFailedTasks + stageData.numKilledTasks
+ if (totalTasks == 0) {
val content =
Summary Metrics No tasks have started yet
@@ -127,18 +130,14 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
return UIUtils.headerSparkPage(stageHeader, content, parent)
}
+ val storedTasks = store.taskCount(stageData.stageId, stageData.attemptId)
val numCompleted = stageData.numCompleteTasks
- val totalTasks = stageData.numActiveTasks + stageData.numCompleteTasks +
- stageData.numFailedTasks + stageData.numKilledTasks
- val totalTasksNumStr = if (totalTasks == tasks.size) {
+ val totalTasksNumStr = if (totalTasks == storedTasks) {
s"$totalTasks"
} else {
- s"$totalTasks, showing ${tasks.size}"
+ s"$totalTasks, showing ${storedTasks}"
}
- val externalAccumulables = stageData.accumulatorUpdates
- val hasAccumulators = externalAccumulables.size > 0
-
val summary =
@@ -148,7 +147,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
-
Locality Level Summary:
- {getLocalitySummaryString(stageData, tasks)}
+ {getLocalitySummaryString(localitySummary)}
{if (hasInput(stageData)) {
-
@@ -266,7 +265,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val accumulableTable = UIUtils.listingTable(
accumulableHeaders,
accumulableRow,
- externalAccumulables.toSeq)
+ stageData.accumulatorUpdates.toSeq)
val page: Int = {
// If the user has changed to a larger page size, then go to page 1 in order to avoid
@@ -280,16 +279,9 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val currentTime = System.currentTimeMillis()
val (taskTable, taskTableHTML) = try {
val _taskTable = new TaskPagedTable(
- parent.conf,
+ stageData,
UIUtils.prependBaseUri(parent.basePath) +
s"/stages/stage?id=${stageId}&attempt=${stageAttemptId}",
- tasks,
- hasAccumulators,
- hasInput(stageData),
- hasOutput(stageData),
- hasShuffleRead(stageData),
- hasShuffleWrite(stageData),
- hasBytesSpilled(stageData),
currentTime,
pageSize = taskPageSize,
sortColumn = taskSortColumn,
@@ -320,217 +312,155 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
| }
|});
""".stripMargin
- }
+ }
}
- val taskIdsInPage = if (taskTable == null) Set.empty[Long]
- else taskTable.dataSource.slicedTaskIds
+ val metricsSummary = store.taskSummary(stageData.stageId, stageData.attemptId,
+ Array(0, 0.25, 0.5, 0.75, 1.0))
- // Excludes tasks which failed and have incomplete metrics
- val validTasks = tasks.filter(t => t.status == "SUCCESS" && t.taskMetrics.isDefined)
-
- val summaryTable: Option[Seq[Node]] =
- if (validTasks.size == 0) {
- None
- } else {
- def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] = {
- Distribution(data).get.getQuantiles()
- }
- def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = {
- getDistributionQuantiles(times).map { millis =>
-
| {UIUtils.formatDuration(millis.toLong)} |
- }
- }
- def getFormattedSizeQuantiles(data: Seq[Double]): Seq[Elem] = {
- getDistributionQuantiles(data).map(d => {Utils.bytesToString(d.toLong)} | )
+ val summaryTable = metricsSummary.map { metrics =>
+ def timeQuantiles(data: IndexedSeq[Double]): Seq[Node] = {
+ data.map { millis =>
+ {UIUtils.formatDuration(millis.toLong)} |
}
+ }
- val deserializationTimes = validTasks.map { task =>
- task.taskMetrics.get.executorDeserializeTime.toDouble
- }
- val deserializationQuantiles =
-
-
- Task Deserialization Time
-
- | +: getFormattedTimeQuantiles(deserializationTimes)
-
- val serviceTimes = validTasks.map(_.taskMetrics.get.executorRunTime.toDouble)
- val serviceQuantiles = Duration | +: getFormattedTimeQuantiles(serviceTimes)
-
- val gcTimes = validTasks.map(_.taskMetrics.get.jvmGcTime.toDouble)
- val gcQuantiles =
-
- GC Time
-
- | +: getFormattedTimeQuantiles(gcTimes)
-
- val serializationTimes = validTasks.map(_.taskMetrics.get.resultSerializationTime.toDouble)
- val serializationQuantiles =
-
-
- Result Serialization Time
-
- | +: getFormattedTimeQuantiles(serializationTimes)
-
- val gettingResultTimes = validTasks.map(getGettingResultTime(_, currentTime).toDouble)
- val gettingResultQuantiles =
-
-
- Getting Result Time
-
- | +:
- getFormattedTimeQuantiles(gettingResultTimes)
-
- val peakExecutionMemory = validTasks.map(_.taskMetrics.get.peakExecutionMemory.toDouble)
- val peakExecutionMemoryQuantiles = {
-
-
- Peak Execution Memory
-
- | +: getFormattedSizeQuantiles(peakExecutionMemory)
+ def sizeQuantiles(data: IndexedSeq[Double]): Seq[Node] = {
+ data.map { size =>
+ {Utils.bytesToString(size.toLong)} |
}
+ }
- // The scheduler delay includes the network delay to send the task to the worker
- // machine and to send back the result (but not the time to fetch the task result,
- // if it needed to be fetched from the block manager on the worker).
- val schedulerDelays = validTasks.map { task =>
- getSchedulerDelay(task, task.taskMetrics.get, currentTime).toDouble
- }
- val schedulerDelayTitle = Scheduler Delay |
- val schedulerDelayQuantiles = schedulerDelayTitle +:
- getFormattedTimeQuantiles(schedulerDelays)
- def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double])
- : Seq[Elem] = {
- val recordDist = getDistributionQuantiles(records).iterator
- getDistributionQuantiles(data).map(d =>
- {s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"} |
- )
+ def sizeQuantilesWithRecords(
+ data: IndexedSeq[Double],
+ records: IndexedSeq[Double]) : Seq[Node] = {
+ data.zip(records).map { case (d, r) =>
+ {s"${Utils.bytesToString(d.toLong)} / ${r.toLong}"} |
}
+ }
- val inputSizes = validTasks.map(_.taskMetrics.get.inputMetrics.bytesRead.toDouble)
- val inputRecords = validTasks.map(_.taskMetrics.get.inputMetrics.recordsRead.toDouble)
- val inputQuantiles = Input Size / Records | +:
- getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords)
+ def titleCell(title: String, tooltip: String): Seq[Node] = {
+
+
+ {title}
+
+ |
+ }
- val outputSizes = validTasks.map(_.taskMetrics.get.outputMetrics.bytesWritten.toDouble)
- val outputRecords = validTasks.map(_.taskMetrics.get.outputMetrics.recordsWritten.toDouble)
- val outputQuantiles = Output Size / Records | +:
- getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords)
+ def simpleTitleCell(title: String): Seq[Node] = {title} |
- val shuffleReadBlockedTimes = validTasks.map { task =>
- task.taskMetrics.get.shuffleReadMetrics.fetchWaitTime.toDouble
- }
- val shuffleReadBlockedQuantiles =
-
-
- Shuffle Read Blocked Time
-
- | +:
- getFormattedTimeQuantiles(shuffleReadBlockedTimes)
-
- val shuffleReadTotalSizes = validTasks.map { task =>
- totalBytesRead(task.taskMetrics.get.shuffleReadMetrics).toDouble
- }
- val shuffleReadTotalRecords = validTasks.map { task =>
- task.taskMetrics.get.shuffleReadMetrics.recordsRead.toDouble
- }
- val shuffleReadTotalQuantiles =
-
-
- Shuffle Read Size / Records
-
- | +:
- getFormattedSizeQuantilesWithRecords(shuffleReadTotalSizes, shuffleReadTotalRecords)
-
- val shuffleReadRemoteSizes = validTasks.map { task =>
- task.taskMetrics.get.shuffleReadMetrics.remoteBytesRead.toDouble
- }
- val shuffleReadRemoteQuantiles =
-
-
- Shuffle Remote Reads
-
- | +:
- getFormattedSizeQuantiles(shuffleReadRemoteSizes)
-
- val shuffleWriteSizes = validTasks.map { task =>
- task.taskMetrics.get.shuffleWriteMetrics.bytesWritten.toDouble
- }
+ val deserializationQuantiles = titleCell("Task Deserialization Time",
+ ToolTips.TASK_DESERIALIZATION_TIME) ++ timeQuantiles(metrics.executorDeserializeTime)
- val shuffleWriteRecords = validTasks.map { task =>
- task.taskMetrics.get.shuffleWriteMetrics.recordsWritten.toDouble
- }
+ val serviceQuantiles = simpleTitleCell("Duration") ++ timeQuantiles(metrics.executorRunTime)
- val shuffleWriteQuantiles = Shuffle Write Size / Records | +:
- getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords)
+ val gcQuantiles = titleCell("GC Time", ToolTips.GC_TIME) ++ timeQuantiles(metrics.jvmGcTime)
- val memoryBytesSpilledSizes = validTasks.map(_.taskMetrics.get.memoryBytesSpilled.toDouble)
- val memoryBytesSpilledQuantiles = Shuffle spill (memory) | +:
- getFormattedSizeQuantiles(memoryBytesSpilledSizes)
+ val serializationQuantiles = titleCell("Result Serialization Time",
+ ToolTips.RESULT_SERIALIZATION_TIME) ++ timeQuantiles(metrics.resultSerializationTime)
- val diskBytesSpilledSizes = validTasks.map(_.taskMetrics.get.diskBytesSpilled.toDouble)
- val diskBytesSpilledQuantiles = Shuffle spill (disk) | +:
- getFormattedSizeQuantiles(diskBytesSpilledSizes)
+ val gettingResultQuantiles = titleCell("Getting Result Time", ToolTips.GETTING_RESULT_TIME) ++
+ timeQuantiles(metrics.gettingResultTime)
- val listings: Seq[Seq[Node]] = Seq(
- {serviceQuantiles} ,
- {schedulerDelayQuantiles} ,
-
- {deserializationQuantiles}
-
- {gcQuantiles} ,
-
- {serializationQuantiles}
- ,
- {gettingResultQuantiles} ,
-
- {peakExecutionMemoryQuantiles}
- ,
- if (hasInput(stageData)) {inputQuantiles} else Nil,
- if (hasOutput(stageData)) {outputQuantiles} else Nil,
- if (hasShuffleRead(stageData)) {
-
- {shuffleReadBlockedQuantiles}
-
- {shuffleReadTotalQuantiles}
-
- {shuffleReadRemoteQuantiles}
-
- } else {
- Nil
- },
- if (hasShuffleWrite(stageData)) {shuffleWriteQuantiles} else Nil,
- if (hasBytesSpilled(stageData)) {memoryBytesSpilledQuantiles} else Nil,
- if (hasBytesSpilled(stageData)) {diskBytesSpilledQuantiles} else Nil)
-
- val quantileHeaders = Seq("Metric", "Min", "25th percentile",
- "Median", "75th percentile", "Max")
- // The summary table does not use CSS to stripe rows, which doesn't work with hidden
- // rows (instead, JavaScript in table.js is used to stripe the non-hidden rows).
- Some(UIUtils.listingTable(
- quantileHeaders,
- identity[Seq[Node]],
- listings,
- fixedWidth = true,
- id = Some("task-summary-table"),
- stripeRowsWithCss = false))
+ val peakExecutionMemoryQuantiles = titleCell("Peak Execution Memory",
+ ToolTips.PEAK_EXECUTION_MEMORY) ++ sizeQuantiles(metrics.peakExecutionMemory)
+
+ // The scheduler delay includes the network delay to send the task to the worker
+ // machine and to send back the result (but not the time to fetch the task result,
+ // if it needed to be fetched from the block manager on the worker).
+ val schedulerDelayQuantiles = titleCell("Scheduler Delay", ToolTips.SCHEDULER_DELAY) ++
+ timeQuantiles(metrics.schedulerDelay)
+
+ def inputQuantiles: Seq[Node] = {
+ simpleTitleCell("Input Size / Records") ++
+ sizeQuantilesWithRecords(metrics.inputMetrics.bytesRead, metrics.inputMetrics.recordsRead)
+ }
+
+ def outputQuantiles: Seq[Node] = {
+ simpleTitleCell("Output Size / Records") ++
+ sizeQuantilesWithRecords(metrics.outputMetrics.bytesWritten,
+ metrics.outputMetrics.recordsWritten)
}
+ def shuffleReadBlockedQuantiles: Seq[Node] = {
+ titleCell("Shuffle Read Blocked Time", ToolTips.SHUFFLE_READ_BLOCKED_TIME) ++
+ timeQuantiles(metrics.shuffleReadMetrics.fetchWaitTime)
+ }
+
+ def shuffleReadTotalQuantiles: Seq[Node] = {
+ titleCell("Shuffle Read Size / Records", ToolTips.SHUFFLE_READ) ++
+ sizeQuantilesWithRecords(metrics.shuffleReadMetrics.readBytes,
+ metrics.shuffleReadMetrics.readRecords)
+ }
+
+ def shuffleReadRemoteQuantiles: Seq[Node] = {
+ titleCell("Shuffle Remote Reads", ToolTips.SHUFFLE_READ_REMOTE_SIZE) ++
+ sizeQuantiles(metrics.shuffleReadMetrics.remoteBytesRead)
+ }
+
+ def shuffleWriteQuantiles: Seq[Node] = {
+ simpleTitleCell("Shuffle Write Size / Records") ++
+ sizeQuantilesWithRecords(metrics.shuffleWriteMetrics.writeBytes,
+ metrics.shuffleWriteMetrics.writeRecords)
+ }
+
+ def memoryBytesSpilledQuantiles: Seq[Node] = {
+ simpleTitleCell("Shuffle spill (memory)") ++ sizeQuantiles(metrics.memoryBytesSpilled)
+ }
+
+ def diskBytesSpilledQuantiles: Seq[Node] = {
+ simpleTitleCell("Shuffle spill (disk)") ++ sizeQuantiles(metrics.diskBytesSpilled)
+ }
+
+ val listings: Seq[Seq[Node]] = Seq(
+ {serviceQuantiles} ,
+ {schedulerDelayQuantiles} ,
+
+ {deserializationQuantiles}
+
+ {gcQuantiles} ,
+
+ {serializationQuantiles}
+ ,
+ {gettingResultQuantiles} ,
+
+ {peakExecutionMemoryQuantiles}
+ ,
+ if (hasInput(stageData)) {inputQuantiles} else Nil,
+ if (hasOutput(stageData)) {outputQuantiles} else Nil,
+ if (hasShuffleRead(stageData)) {
+
+ {shuffleReadBlockedQuantiles}
+
+ {shuffleReadTotalQuantiles}
+
+ {shuffleReadRemoteQuantiles}
+
+ } else {
+ Nil
+ },
+ if (hasShuffleWrite(stageData)) {shuffleWriteQuantiles} else Nil,
+ if (hasBytesSpilled(stageData)) {memoryBytesSpilledQuantiles} else Nil,
+ if (hasBytesSpilled(stageData)) {diskBytesSpilledQuantiles} else Nil)
+
+ val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile",
+ "Max")
+ // The summary table does not use CSS to stripe rows, which doesn't work with hidden
+ // rows (instead, JavaScript in table.js is used to stripe the non-hidden rows).
+ UIUtils.listingTable(
+ quantileHeaders,
+ identity[Seq[Node]],
+ listings,
+ fixedWidth = true,
+ id = Some("task-summary-table"),
+ stripeRowsWithCss = false)
+ }
+
val executorTable = new ExecutorTable(stageData, parent.store)
val maybeAccumulableTable: Seq[Node] =
- if (hasAccumulators) { Accumulators ++ accumulableTable } else Seq()
+ if (hasAccumulators(stageData)) { Accumulators ++ accumulableTable } else Seq()
val aggMetrics =
taskIdsInPage.contains(t.taskId) },
+ Option(taskTable).map(_.dataSource.tasks).getOrElse(Nil),
currentTime) ++
++
{summaryTable.getOrElse("No tasks have reported metrics yet.")} ++
@@ -593,10 +523,9 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val serializationTimeProportion = toProportion(serializationTime)
val deserializationTime = metricsOpt.map(_.executorDeserializeTime).getOrElse(0L)
val deserializationTimeProportion = toProportion(deserializationTime)
- val gettingResultTime = getGettingResultTime(taskInfo, currentTime)
+ val gettingResultTime = AppStatusUtils.gettingResultTime(taskInfo)
val gettingResultTimeProportion = toProportion(gettingResultTime)
- val schedulerDelay =
- metricsOpt.map(getSchedulerDelay(taskInfo, _, currentTime)).getOrElse(0L)
+ val schedulerDelay = AppStatusUtils.schedulerDelay(taskInfo)
val schedulerDelayProportion = toProportion(schedulerDelay)
val executorOverhead = serializationTime + deserializationTime
@@ -708,7 +637,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
{
if (MAX_TIMELINE_TASKS < tasks.size) {
- This stage has more than the maximum number of tasks that can be shown in the
+ This page has more than the maximum number of tasks that can be shown in the
visualization! Only the most recent {MAX_TIMELINE_TASKS} tasks
(of {tasks.size} total) are shown.
@@ -733,402 +662,49 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
}
-private[ui] object StagePage {
- private[ui] def getGettingResultTime(info: TaskData, currentTime: Long): Long = {
- info.resultFetchStart match {
- case Some(start) =>
- info.duration match {
- case Some(duration) =>
- info.launchTime.getTime() + duration - start.getTime()
-
- case _ =>
- currentTime - start.getTime()
- }
-
- case _ =>
- 0L
- }
- }
-
- private[ui] def getSchedulerDelay(
- info: TaskData,
- metrics: TaskMetrics,
- currentTime: Long): Long = {
- info.duration match {
- case Some(duration) =>
- val executorOverhead = metrics.executorDeserializeTime + metrics.resultSerializationTime
- math.max(
- 0,
- duration - metrics.executorRunTime - executorOverhead -
- getGettingResultTime(info, currentTime))
-
- case _ =>
- // The task is still running and the metrics like executorRunTime are not available.
- 0L
- }
- }
-
-}
-
-private[ui] case class TaskTableRowInputData(inputSortable: Long, inputReadable: String)
-
-private[ui] case class TaskTableRowOutputData(outputSortable: Long, outputReadable: String)
-
-private[ui] case class TaskTableRowShuffleReadData(
- shuffleReadBlockedTimeSortable: Long,
- shuffleReadBlockedTimeReadable: String,
- shuffleReadSortable: Long,
- shuffleReadReadable: String,
- shuffleReadRemoteSortable: Long,
- shuffleReadRemoteReadable: String)
-
-private[ui] case class TaskTableRowShuffleWriteData(
- writeTimeSortable: Long,
- writeTimeReadable: String,
- shuffleWriteSortable: Long,
- shuffleWriteReadable: String)
-
-private[ui] case class TaskTableRowBytesSpilledData(
- memoryBytesSpilledSortable: Long,
- memoryBytesSpilledReadable: String,
- diskBytesSpilledSortable: Long,
- diskBytesSpilledReadable: String)
-
-/**
- * Contains all data that needs for sorting and generating HTML. Using this one rather than
- * TaskData to avoid creating duplicate contents during sorting the data.
- */
-private[ui] class TaskTableRowData(
- val index: Int,
- val taskId: Long,
- val attempt: Int,
- val speculative: Boolean,
- val status: String,
- val taskLocality: String,
- val executorId: String,
- val host: String,
- val launchTime: Long,
- val duration: Long,
- val formatDuration: String,
- val schedulerDelay: Long,
- val taskDeserializationTime: Long,
- val gcTime: Long,
- val serializationTime: Long,
- val gettingResultTime: Long,
- val peakExecutionMemoryUsed: Long,
- val accumulators: Option[String], // HTML
- val input: Option[TaskTableRowInputData],
- val output: Option[TaskTableRowOutputData],
- val shuffleRead: Option[TaskTableRowShuffleReadData],
- val shuffleWrite: Option[TaskTableRowShuffleWriteData],
- val bytesSpilled: Option[TaskTableRowBytesSpilledData],
- val error: String,
- val logs: Map[String, String])
-
private[ui] class TaskDataSource(
- tasks: Seq[TaskData],
- hasAccumulators: Boolean,
- hasInput: Boolean,
- hasOutput: Boolean,
- hasShuffleRead: Boolean,
- hasShuffleWrite: Boolean,
- hasBytesSpilled: Boolean,
+ stage: StageData,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean,
- store: AppStatusStore) extends PagedDataSource[TaskTableRowData](pageSize) {
- import StagePage._
+ store: AppStatusStore) extends PagedDataSource[TaskData](pageSize) {
+ import ApiHelper._
// Keep an internal cache of executor log maps so that long task lists render faster.
private val executorIdToLogs = new HashMap[String, Map[String, String]]()
- // Convert TaskData to TaskTableRowData which contains the final contents to show in the table
- // so that we can avoid creating duplicate contents during sorting the data
- private val data = tasks.map(taskRow).sorted(ordering(sortColumn, desc))
-
- private var _slicedTaskIds: Set[Long] = _
+ private var _tasksToShow: Seq[TaskData] = null
- override def dataSize: Int = data.size
+ override def dataSize: Int = stage.numTasks
- override def sliceData(from: Int, to: Int): Seq[TaskTableRowData] = {
- val r = data.slice(from, to)
- _slicedTaskIds = r.map(_.taskId).toSet
- r
- }
-
- def slicedTaskIds: Set[Long] = _slicedTaskIds
-
- private def taskRow(info: TaskData): TaskTableRowData = {
- val metrics = info.taskMetrics
- val duration = info.duration.getOrElse(1L)
- val formatDuration = info.duration.map(d => UIUtils.formatDuration(d)).getOrElse("")
- val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L)
- val gcTime = metrics.map(_.jvmGcTime).getOrElse(0L)
- val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L)
- val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L)
- val gettingResultTime = getGettingResultTime(info, currentTime)
-
- val externalAccumulableReadable = info.accumulatorUpdates.map { acc =>
- StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update}")
+ override def sliceData(from: Int, to: Int): Seq[TaskData] = {
+ if (_tasksToShow == null) {
+ _tasksToShow = store.taskList(stage.stageId, stage.attemptId, from, to - from,
+ indexName(sortColumn), !desc)
}
- val peakExecutionMemoryUsed = metrics.map(_.peakExecutionMemory).getOrElse(0L)
-
- val maybeInput = metrics.map(_.inputMetrics)
- val inputSortable = maybeInput.map(_.bytesRead).getOrElse(0L)
- val inputReadable = maybeInput
- .map(m => s"${Utils.bytesToString(m.bytesRead)}")
- .getOrElse("")
- val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("")
-
- val maybeOutput = metrics.map(_.outputMetrics)
- val outputSortable = maybeOutput.map(_.bytesWritten).getOrElse(0L)
- val outputReadable = maybeOutput
- .map(m => s"${Utils.bytesToString(m.bytesWritten)}")
- .getOrElse("")
- val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("")
-
- val maybeShuffleRead = metrics.map(_.shuffleReadMetrics)
- val shuffleReadBlockedTimeSortable = maybeShuffleRead.map(_.fetchWaitTime).getOrElse(0L)
- val shuffleReadBlockedTimeReadable =
- maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("")
-
- val totalShuffleBytes = maybeShuffleRead.map(ApiHelper.totalBytesRead)
- val shuffleReadSortable = totalShuffleBytes.getOrElse(0L)
- val shuffleReadReadable = totalShuffleBytes.map(Utils.bytesToString).getOrElse("")
- val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("")
-
- val remoteShuffleBytes = maybeShuffleRead.map(_.remoteBytesRead)
- val shuffleReadRemoteSortable = remoteShuffleBytes.getOrElse(0L)
- val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")
-
- val maybeShuffleWrite = metrics.map(_.shuffleWriteMetrics)
- val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
- val shuffleWriteReadable = maybeShuffleWrite
- .map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
- val shuffleWriteRecords = maybeShuffleWrite
- .map(_.recordsWritten.toString).getOrElse("")
-
- val maybeWriteTime = metrics.map(_.shuffleWriteMetrics.writeTime)
- val writeTimeSortable = maybeWriteTime.getOrElse(0L)
- val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
- if (ms == 0) "" else UIUtils.formatDuration(ms)
- }.getOrElse("")
-
- val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled)
- val memoryBytesSpilledSortable = maybeMemoryBytesSpilled.getOrElse(0L)
- val memoryBytesSpilledReadable =
- maybeMemoryBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
- val maybeDiskBytesSpilled = metrics.map(_.diskBytesSpilled)
- val diskBytesSpilledSortable = maybeDiskBytesSpilled.getOrElse(0L)
- val diskBytesSpilledReadable = maybeDiskBytesSpilled.map(Utils.bytesToString).getOrElse("")
-
- val input =
- if (hasInput) {
- Some(TaskTableRowInputData(inputSortable, s"$inputReadable / $inputRecords"))
- } else {
- None
- }
-
- val output =
- if (hasOutput) {
- Some(TaskTableRowOutputData(outputSortable, s"$outputReadable / $outputRecords"))
- } else {
- None
- }
-
- val shuffleRead =
- if (hasShuffleRead) {
- Some(TaskTableRowShuffleReadData(
- shuffleReadBlockedTimeSortable,
- shuffleReadBlockedTimeReadable,
- shuffleReadSortable,
- s"$shuffleReadReadable / $shuffleReadRecords",
- shuffleReadRemoteSortable,
- shuffleReadRemoteReadable
- ))
- } else {
- None
- }
-
- val shuffleWrite =
- if (hasShuffleWrite) {
- Some(TaskTableRowShuffleWriteData(
- writeTimeSortable,
- writeTimeReadable,
- shuffleWriteSortable,
- s"$shuffleWriteReadable / $shuffleWriteRecords"
- ))
- } else {
- None
- }
-
- val bytesSpilled =
- if (hasBytesSpilled) {
- Some(TaskTableRowBytesSpilledData(
- memoryBytesSpilledSortable,
- memoryBytesSpilledReadable,
- diskBytesSpilledSortable,
- diskBytesSpilledReadable
- ))
- } else {
- None
- }
-
- new TaskTableRowData(
- info.index,
- info.taskId,
- info.attempt,
- info.speculative,
- info.status,
- info.taskLocality.toString,
- info.executorId,
- info.host,
- info.launchTime.getTime(),
- duration,
- formatDuration,
- schedulerDelay,
- taskDeserializationTime,
- gcTime,
- serializationTime,
- gettingResultTime,
- peakExecutionMemoryUsed,
- if (hasAccumulators) Some(externalAccumulableReadable.mkString(" ")) else None,
- input,
- output,
- shuffleRead,
- shuffleWrite,
- bytesSpilled,
- info.errorMessage.getOrElse(""),
- executorLogs(info.executorId))
+ _tasksToShow
}
- private def executorLogs(id: String): Map[String, String] = {
+ def tasks: Seq[TaskData] = _tasksToShow
+
+ def executorLogs(id: String): Map[String, String] = {
executorIdToLogs.getOrElseUpdate(id,
store.asOption(store.executorSummary(id)).map(_.executorLogs).getOrElse(Map.empty))
}
- /**
- * Return Ordering according to sortColumn and desc
- */
- private def ordering(sortColumn: String, desc: Boolean): Ordering[TaskTableRowData] = {
- val ordering: Ordering[TaskTableRowData] = sortColumn match {
- case "Index" => Ordering.by(_.index)
- case "ID" => Ordering.by(_.taskId)
- case "Attempt" => Ordering.by(_.attempt)
- case "Status" => Ordering.by(_.status)
- case "Locality Level" => Ordering.by(_.taskLocality)
- case "Executor ID" => Ordering.by(_.executorId)
- case "Host" => Ordering.by(_.host)
- case "Launch Time" => Ordering.by(_.launchTime)
- case "Duration" => Ordering.by(_.duration)
- case "Scheduler Delay" => Ordering.by(_.schedulerDelay)
- case "Task Deserialization Time" => Ordering.by(_.taskDeserializationTime)
- case "GC Time" => Ordering.by(_.gcTime)
- case "Result Serialization Time" => Ordering.by(_.serializationTime)
- case "Getting Result Time" => Ordering.by(_.gettingResultTime)
- case "Peak Execution Memory" => Ordering.by(_.peakExecutionMemoryUsed)
- case "Accumulators" =>
- if (hasAccumulators) {
- Ordering.by(_.accumulators.get)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Accumulators because of no accumulators")
- }
- case "Input Size / Records" =>
- if (hasInput) {
- Ordering.by(_.input.get.inputSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Input Size / Records because of no inputs")
- }
- case "Output Size / Records" =>
- if (hasOutput) {
- Ordering.by(_.output.get.outputSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Output Size / Records because of no outputs")
- }
- // ShuffleRead
- case "Shuffle Read Blocked Time" =>
- if (hasShuffleRead) {
- Ordering.by(_.shuffleRead.get.shuffleReadBlockedTimeSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Shuffle Read Blocked Time because of no shuffle reads")
- }
- case "Shuffle Read Size / Records" =>
- if (hasShuffleRead) {
- Ordering.by(_.shuffleRead.get.shuffleReadSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Shuffle Read Size / Records because of no shuffle reads")
- }
- case "Shuffle Remote Reads" =>
- if (hasShuffleRead) {
- Ordering.by(_.shuffleRead.get.shuffleReadRemoteSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Shuffle Remote Reads because of no shuffle reads")
- }
- // ShuffleWrite
- case "Write Time" =>
- if (hasShuffleWrite) {
- Ordering.by(_.shuffleWrite.get.writeTimeSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Write Time because of no shuffle writes")
- }
- case "Shuffle Write Size / Records" =>
- if (hasShuffleWrite) {
- Ordering.by(_.shuffleWrite.get.shuffleWriteSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Shuffle Write Size / Records because of no shuffle writes")
- }
- // BytesSpilled
- case "Shuffle Spill (Memory)" =>
- if (hasBytesSpilled) {
- Ordering.by(_.bytesSpilled.get.memoryBytesSpilledSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Shuffle Spill (Memory) because of no spills")
- }
- case "Shuffle Spill (Disk)" =>
- if (hasBytesSpilled) {
- Ordering.by(_.bytesSpilled.get.diskBytesSpilledSortable)
- } else {
- throw new IllegalArgumentException(
- "Cannot sort by Shuffle Spill (Disk) because of no spills")
- }
- case "Errors" => Ordering.by(_.error)
- case unknownColumn => throw new IllegalArgumentException(s"Unknown column: $unknownColumn")
- }
- if (desc) {
- ordering.reverse
- } else {
- ordering
- }
- }
-
}
private[ui] class TaskPagedTable(
- conf: SparkConf,
+ stage: StageData,
basePath: String,
- data: Seq[TaskData],
- hasAccumulators: Boolean,
- hasInput: Boolean,
- hasOutput: Boolean,
- hasShuffleRead: Boolean,
- hasShuffleWrite: Boolean,
- hasBytesSpilled: Boolean,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean,
- store: AppStatusStore) extends PagedTable[TaskTableRowData] {
+ store: AppStatusStore) extends PagedTable[TaskData] {
+
+ import ApiHelper._
override def tableId: String = "task-table"
@@ -1142,13 +718,7 @@ private[ui] class TaskPagedTable(
override def pageNumberFormField: String = "task.page"
override val dataSource: TaskDataSource = new TaskDataSource(
- data,
- hasAccumulators,
- hasInput,
- hasOutput,
- hasShuffleRead,
- hasShuffleWrite,
- hasBytesSpilled,
+ stage,
currentTime,
pageSize,
sortColumn,
@@ -1180,22 +750,22 @@ private[ui] class TaskPagedTable(
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME),
("Peak Execution Memory", TaskDetailsClassNames.PEAK_EXECUTION_MEMORY)) ++
- {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
- {if (hasInput) Seq(("Input Size / Records", "")) else Nil} ++
- {if (hasOutput) Seq(("Output Size / Records", "")) else Nil} ++
- {if (hasShuffleRead) {
+ {if (hasAccumulators(stage)) Seq(("Accumulators", "")) else Nil} ++
+ {if (hasInput(stage)) Seq(("Input Size / Records", "")) else Nil} ++
+ {if (hasOutput(stage)) Seq(("Output Size / Records", "")) else Nil} ++
+ {if (hasShuffleRead(stage)) {
Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME),
("Shuffle Read Size / Records", ""),
("Shuffle Remote Reads", TaskDetailsClassNames.SHUFFLE_READ_REMOTE_SIZE))
} else {
Nil
}} ++
- {if (hasShuffleWrite) {
+ {if (hasShuffleWrite(stage)) {
Seq(("Write Time", ""), ("Shuffle Write Size / Records", ""))
} else {
Nil
}} ++
- {if (hasBytesSpilled) {
+ {if (hasBytesSpilled(stage)) {
Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", ""))
} else {
Nil
@@ -1237,7 +807,17 @@ private[ui] class TaskPagedTable(
{headerRow}
}
- def row(task: TaskTableRowData): Seq[Node] = {
+ def row(task: TaskData): Seq[Node] = {
+ def formatDuration(value: Option[Long], hideZero: Boolean = false): String = {
+ value.map { v =>
+ if (v > 0 || !hideZero) UIUtils.formatDuration(v) else ""
+ }.getOrElse("")
+ }
+
+ def formatBytes(value: Option[Long]): String = {
+ Utils.bytesToString(value.getOrElse(0L))
+ }
+
| {task.index} |
{task.taskId} |
@@ -1249,62 +829,98 @@ private[ui] class TaskPagedTable(
{task.host}
{
- task.logs.map {
+ dataSource.executorLogs(task.executorId).map {
case (logName, logUrl) =>
}
}
- {UIUtils.formatDate(new Date(task.launchTime))} |
- {task.formatDuration} |
+ {UIUtils.formatDate(task.launchTime)} |
+ {formatDuration(task.duration)} |
- {UIUtils.formatDuration(task.schedulerDelay)}
+ {UIUtils.formatDuration(AppStatusUtils.schedulerDelay(task))}
|
- {UIUtils.formatDuration(task.taskDeserializationTime)}
+ {formatDuration(task.taskMetrics.map(_.executorDeserializeTime))}
|
- {if (task.gcTime > 0) UIUtils.formatDuration(task.gcTime) else ""}
+ {formatDuration(task.taskMetrics.map(_.jvmGcTime), hideZero = true)}
|
- {UIUtils.formatDuration(task.serializationTime)}
+ {formatDuration(task.taskMetrics.map(_.resultSerializationTime))}
|
- {UIUtils.formatDuration(task.gettingResultTime)}
+ {UIUtils.formatDuration(AppStatusUtils.gettingResultTime(task))}
|
- {Utils.bytesToString(task.peakExecutionMemoryUsed)}
+ {formatBytes(task.taskMetrics.map(_.peakExecutionMemory))}
|
- {if (task.accumulators.nonEmpty) {
- {Unparsed(task.accumulators.get)} |
+ {if (hasAccumulators(stage)) {
+ accumulatorsInfo(task)
}}
- {if (task.input.nonEmpty) {
- {task.input.get.inputReadable} |
+ {if (hasInput(stage)) {
+ metricInfo(task) { m =>
+ val bytesRead = Utils.bytesToString(m.inputMetrics.bytesRead)
+ val records = m.inputMetrics.recordsRead
+ {bytesRead} / {records} |
+ }
}}
- {if (task.output.nonEmpty) {
- {task.output.get.outputReadable} |
+ {if (hasOutput(stage)) {
+ metricInfo(task) { m =>
+ val bytesWritten = Utils.bytesToString(m.outputMetrics.bytesWritten)
+ val records = m.outputMetrics.recordsWritten
+ {bytesWritten} / {records} |
+ }
}}
- {if (task.shuffleRead.nonEmpty) {
+ {if (hasShuffleRead(stage)) {
- {task.shuffleRead.get.shuffleReadBlockedTimeReadable}
+ {formatDuration(task.taskMetrics.map(_.shuffleReadMetrics.fetchWaitTime))}
|
- {task.shuffleRead.get.shuffleReadReadable} |
+ {
+ metricInfo(task) { m =>
+ val bytesRead = Utils.bytesToString(totalBytesRead(m.shuffleReadMetrics))
+ val records = m.shuffleReadMetrics.recordsRead
+ Unparsed(s"$bytesRead / $records")
+ }
+ } |
- {task.shuffleRead.get.shuffleReadRemoteReadable}
+ {formatBytes(task.taskMetrics.map(_.shuffleReadMetrics.remoteBytesRead))}
|
}}
- {if (task.shuffleWrite.nonEmpty) {
- {task.shuffleWrite.get.writeTimeReadable} |
- {task.shuffleWrite.get.shuffleWriteReadable} |
+ {if (hasShuffleWrite(stage)) {
+ {
+ formatDuration(
+ task.taskMetrics.map { m =>
+ TimeUnit.NANOSECONDS.toMillis(m.shuffleWriteMetrics.writeTime)
+ },
+ hideZero = true)
+ } |
+ {
+ metricInfo(task) { m =>
+ val bytesWritten = Utils.bytesToString(m.shuffleWriteMetrics.bytesWritten)
+ val records = m.shuffleWriteMetrics.recordsWritten
+ Unparsed(s"$bytesWritten / $records")
+ }
+ } |
}}
- {if (task.bytesSpilled.nonEmpty) {
- {task.bytesSpilled.get.memoryBytesSpilledReadable} |
- {task.bytesSpilled.get.diskBytesSpilledReadable} |
+ {if (hasBytesSpilled(stage)) {
+ {formatBytes(task.taskMetrics.map(_.memoryBytesSpilled))} |
+ {formatBytes(task.taskMetrics.map(_.diskBytesSpilled))} |
}}
- {errorMessageCell(task.error)}
+ {errorMessageCell(task.errorMessage.getOrElse(""))}
}
+ private def accumulatorsInfo(task: TaskData): Seq[Node] = {
+ task.accumulatorUpdates.map { acc =>
+ Unparsed(StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update}"))
+ }
+ }
+
+ private def metricInfo(task: TaskData)(fn: TaskMetrics => Seq[Node]): Seq[Node] = {
+ task.taskMetrics.map(fn).getOrElse(Nil)
+ }
+
private def errorMessageCell(error: String): Seq[Node] = {
val isMultiline = error.indexOf('\n') >= 0
// Display the first line by default
@@ -1333,6 +949,36 @@ private[ui] class TaskPagedTable(
private object ApiHelper {
+
+ private val COLUMN_TO_INDEX = Map(
+ "ID" -> null.asInstanceOf[String],
+ "Index" -> TaskIndexNames.TASK_INDEX,
+ "Attempt" -> TaskIndexNames.ATTEMPT,
+ "Status" -> TaskIndexNames.STATUS,
+ "Locality Level" -> TaskIndexNames.LOCALITY,
+ "Executor ID / Host" -> TaskIndexNames.EXECUTOR,
+ "Launch Time" -> TaskIndexNames.LAUNCH_TIME,
+ "Duration" -> TaskIndexNames.DURATION,
+ "Scheduler Delay" -> TaskIndexNames.SCHEDULER_DELAY,
+ "Task Deserialization Time" -> TaskIndexNames.DESER_TIME,
+ "GC Time" -> TaskIndexNames.GC_TIME,
+ "Result Serialization Time" -> TaskIndexNames.SER_TIME,
+ "Getting Result Time" -> TaskIndexNames.GETTING_RESULT_TIME,
+ "Peak Execution Memory" -> TaskIndexNames.PEAK_MEM,
+ "Accumulators" -> TaskIndexNames.ACCUMULATORS,
+ "Input Size / Records" -> TaskIndexNames.INPUT_SIZE,
+ "Output Size / Records" -> TaskIndexNames.OUTPUT_SIZE,
+ "Shuffle Read Blocked Time" -> TaskIndexNames.SHUFFLE_READ_TIME,
+ "Shuffle Read Size / Records" -> TaskIndexNames.SHUFFLE_TOTAL_READS,
+ "Shuffle Remote Reads" -> TaskIndexNames.SHUFFLE_REMOTE_READS,
+ "Write Time" -> TaskIndexNames.SHUFFLE_WRITE_TIME,
+ "Shuffle Write Size / Records" -> TaskIndexNames.SHUFFLE_WRITE_SIZE,
+ "Shuffle Spill (Memory)" -> TaskIndexNames.MEM_SPILL,
+ "Shuffle Spill (Disk)" -> TaskIndexNames.DISK_SPILL,
+ "Errors" -> TaskIndexNames.ERROR)
+
+ def hasAccumulators(stageData: StageData): Boolean = stageData.accumulatorUpdates.size > 0
+
def hasInput(stageData: StageData): Boolean = stageData.inputBytes > 0
def hasOutput(stageData: StageData): Boolean = stageData.outputBytes > 0
@@ -1349,4 +995,11 @@ private object ApiHelper {
metrics.localBytesRead + metrics.remoteBytesRead
}
+ def indexName(sortColumn: String): Option[String] = {
+ COLUMN_TO_INDEX.get(sortColumn) match {
+ case Some(v) => Option(v)
+ case _ => throw new IllegalArgumentException(s"Invalid sort column: $sortColumn")
+ }
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index be05a963f0e6..10b032084ce4 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -37,10 +37,10 @@ private[ui] class StagesTab(val parent: SparkUI, val store: AppStatusStore)
attachPage(new PoolPage(this))
def isFairScheduler: Boolean = {
- store.environmentInfo().sparkProperties.toMap
- .get("spark.scheduler.mode")
- .map { mode => mode == SchedulingMode.FAIR }
- .getOrElse(false)
+ store
+ .environmentInfo()
+ .sparkProperties
+ .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
}
def handleKillRequest(request: HttpServletRequest): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index 827a8637b9bd..948858224d72 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -116,7 +116,7 @@ private[spark] object RDDOperationGraph extends Logging {
// Use a special prefix here to differentiate this cluster from other operation clusters
val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
val stageClusterName = s"Stage ${stage.stageId}" +
- { if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
+ { if (stage.attemptNumber == 0) "" else s" (attempt ${stage.attemptNumber})" }
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
var rootNodeCount = 0
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 5e60218c5740..ff83301d631c 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -263,7 +263,7 @@ private[spark] object JsonProtocol {
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
("Stage ID" -> stageInfo.stageId) ~
- ("Stage Attempt ID" -> stageInfo.attemptId) ~
+ ("Stage Attempt ID" -> stageInfo.attemptNumber) ~
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index c2261c204cd4..dffa609f1cbd 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -23,7 +23,9 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
@@ -119,7 +121,8 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}
- @Test
+ // TODO: [SPARK-23020] Re-enable this
+ @Ignore
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
@@ -133,6 +136,10 @@ public void testInProcessLauncher() throws Exception {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
+ // Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
+ // Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
+ // See SPARK-23019 and SparkContext.stop() for details.
+ TimeUnit.MILLISECONDS.sleep(500);
}
}
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index 46b0516e3614..a0664b30d6cc 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -21,6 +21,7 @@
import org.junit.Test;
import org.apache.spark.SparkConf;
+import org.apache.spark.unsafe.memory.MemoryAllocator;
import org.apache.spark.unsafe.memory.MemoryBlock;
public class TaskMemoryManagerSuite {
@@ -68,6 +69,34 @@ public void encodePageNumberAndOffsetOnHeap() {
Assert.assertEquals(64, manager.getOffsetInPage(encodedAddress));
}
+ @Test
+ public void freeingPageSetsPageNumberToSpecialConstant() {
+ final TaskMemoryManager manager = new TaskMemoryManager(
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
+ final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
+ final MemoryBlock dataPage = manager.allocatePage(256, c);
+ c.freePage(dataPage);
+ Assert.assertEquals(MemoryBlock.FREED_IN_ALLOCATOR_PAGE_NUMBER, dataPage.pageNumber);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void freeingPageDirectlyInAllocatorTriggersAssertionError() {
+ final TaskMemoryManager manager = new TaskMemoryManager(
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
+ final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
+ final MemoryBlock dataPage = manager.allocatePage(256, c);
+ MemoryAllocator.HEAP.free(dataPage);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void callingFreePageOnDirectlyAllocatedPageTriggersAssertionError() {
+ final TaskMemoryManager manager = new TaskMemoryManager(
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
+ final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
+ final MemoryBlock dataPage = MemoryAllocator.HEAP.allocate(256);
+ manager.freePage(dataPage, c);
+ }
+
@Test
public void cooperativeSpilling() {
final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
diff --git a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
index 94f5805853e1..f8e233a05a44 100644
--- a/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
+++ b/core/src/test/java/test/org/apache/spark/JavaTaskContextCompileCheck.java
@@ -38,6 +38,7 @@ public static void test() {
tc.attemptNumber();
tc.partitionId();
tc.stageId();
+ tc.stageAttemptNumber();
tc.taskAttemptId();
}
@@ -51,6 +52,7 @@ public void onTaskCompletion(TaskContext context) {
context.isCompleted();
context.isInterrupted();
context.stageId();
+ context.stageAttemptNumber();
context.partitionId();
context.addTaskCompletionListener(this);
}
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
index f8e27703c0de..5c42ac1d87f4 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w__custom_quantiles_expectation.json
@@ -7,6 +7,9 @@
"resultSize" : [ 2010.0, 2065.0, 2065.0 ],
"jvmGcTime" : [ 0.0, 0.0, 7.0 ],
"resultSerializationTime" : [ 0.0, 0.0, 2.0 ],
+ "gettingResultTime" : [ 0.0, 0.0, 0.0 ],
+ "schedulerDelay" : [ 2.0, 6.0, 53.0 ],
+ "peakExecutionMemory" : [ 0.0, 0.0, 0.0 ],
"memoryBytesSpilled" : [ 0.0, 0.0, 0.0 ],
"diskBytesSpilled" : [ 0.0, 0.0, 0.0 ],
"inputMetrics" : {
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
index a28bda16a956..e6b705989cc9 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_read_expectation.json
@@ -7,6 +7,9 @@
"resultSize" : [ 1034.0, 1034.0, 1034.0, 1034.0, 1034.0 ],
"jvmGcTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"resultSerializationTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "gettingResultTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "schedulerDelay" : [ 4.0, 4.0, 6.0, 7.0, 9.0 ],
+ "peakExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"memoryBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"diskBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"inputMetrics" : {
diff --git a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
index ede3eaed1d1d..788f28cf7b36 100644
--- a/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
+++ b/core/src/test/resources/HistoryServerExpectations/stage_task_summary_w_shuffle_write_expectation.json
@@ -7,6 +7,9 @@
"resultSize" : [ 2010.0, 2065.0, 2065.0, 2065.0, 2065.0 ],
"jvmGcTime" : [ 0.0, 0.0, 0.0, 5.0, 7.0 ],
"resultSerializationTime" : [ 0.0, 0.0, 0.0, 0.0, 1.0 ],
+ "gettingResultTime" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
+ "schedulerDelay" : [ 2.0, 4.0, 6.0, 13.0, 40.0 ],
+ "peakExecutionMemory" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"memoryBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"diskBytesSpilled" : [ 0.0, 0.0, 0.0, 0.0, 0.0 ],
"inputMetrics" : {
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 3931d53b4ae0..ced5a06516f7 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -363,14 +363,14 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
// first attempt -- its successful
val writer1 = manager.getWriter[Int, Int](shuffleHandle, 0,
- new TaskContextImpl(0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem))
+ new TaskContextImpl(0, 0, 0, 0L, 0, taskMemoryManager, new Properties, metricsSystem))
val data1 = (1 to 10).map { x => x -> x}
// second attempt -- also successful. We'll write out different data,
// just to simulate the fact that the records may get written differently
// depending on what gets spilled, what gets combined, etc.
val writer2 = manager.getWriter[Int, Int](shuffleHandle, 0,
- new TaskContextImpl(0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem))
+ new TaskContextImpl(0, 0, 0, 1L, 0, taskMemoryManager, new Properties, metricsSystem))
val data2 = (11 to 20).map { x => x -> x}
// interleave writes of both attempts -- we want to test that both attempts can occur
@@ -398,7 +398,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
}
val reader = manager.getReader[Int, Int](shuffleHandle, 0, 1,
- new TaskContextImpl(1, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem))
+ new TaskContextImpl(1, 0, 0, 2L, 0, taskMemoryManager, new Properties, metricsSystem))
val readData = reader.read().toIndexedSeq
assert(readData === data1.toIndexedSeq || readData === data2.toIndexedSeq)
diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
index 159629825c67..9ad2e9a5e74a 100644
--- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
+++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala
@@ -153,6 +153,40 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
assert(broadcast.value.sum === 10)
}
+ test("One broadcast value instance per executor") {
+ val conf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName("test")
+
+ sc = new SparkContext(conf)
+ val list = List[Int](1, 2, 3, 4)
+ val broadcast = sc.broadcast(list)
+ val instances = sc.parallelize(1 to 10)
+ .map(x => System.identityHashCode(broadcast.value))
+ .collect()
+ .toSet
+
+ assert(instances.size === 1)
+ }
+
+ test("One broadcast value instance per executor when memory is constrained") {
+ val conf = new SparkConf()
+ .setMaster("local[4]")
+ .setAppName("test")
+ .set("spark.memory.useLegacyMode", "true")
+ .set("spark.storage.memoryFraction", "0.0")
+
+ sc = new SparkContext(conf)
+ val list = List[Int](1, 2, 3, 4)
+ val broadcast = sc.broadcast(list)
+ val instances = sc.parallelize(1 to 10)
+ .map(x => System.identityHashCode(broadcast.value))
+ .collect()
+ .toSet
+
+ assert(instances.size === 1)
+ }
+
/**
* Verify the persistence of state associated with a TorrentBroadcast in a local-cluster.
*
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
index 362cd861cc24..dcf89e4f75ac 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryTestingUtils.scala
@@ -29,6 +29,7 @@ object MemoryTestingUtils {
val taskMemoryManager = new TaskMemoryManager(env.memoryManager, 0)
new TaskContextImpl(
stageId = 0,
+ stageAttemptNumber = 0,
partitionId = 0,
taskAttemptId = 0,
attemptNumber = 0,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 1beb36afa95f..da6ecb82c7e4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -48,7 +48,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
}
- private def queueSize(bus: LiveListenerBus): Int = {
+ private def sharedQueueSize(bus: LiveListenerBus): Int = {
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
.asInstanceOf[Int]
}
@@ -73,12 +73,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val conf = new SparkConf()
val counter = new BasicJobCounter
val bus = new LiveListenerBus(conf)
- bus.addToSharedQueue(counter)
// Metrics are initially empty.
assert(bus.metrics.numEventsPosted.getCount === 0)
assert(numDroppedEvents(bus) === 0)
- assert(queueSize(bus) === 0)
+ assert(bus.queuedEvents.size === 0)
assert(eventProcessingTimeCount(bus) === 0)
// Post five events:
@@ -87,7 +86,10 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Five messages should be marked as received and queued, but no messages should be posted to
// listeners yet because the the listener bus hasn't been started.
assert(bus.metrics.numEventsPosted.getCount === 5)
- assert(queueSize(bus) === 5)
+ assert(bus.queuedEvents.size === 5)
+
+ // Add the counter to the bus after messages have been queued for later delivery.
+ bus.addToSharedQueue(counter)
assert(counter.count === 0)
// Starting listener bus should flush all buffered events
@@ -95,9 +97,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
- assert(queueSize(bus) === 0)
+ assert(sharedQueueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 5)
+ // After the bus is started, there should be no more queued events.
+ assert(bus.queuedEvents === null)
+
// After listener bus has stopped, posting events should not increment counter
bus.stop()
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
@@ -188,18 +193,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Post a message to the listener bus and wait for processing to begin:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
- assert(queueSize(bus) === 0)
+ assert(sharedQueueSize(bus) === 0)
assert(numDroppedEvents(bus) === 0)
// If we post an additional message then it should remain in the queue because the listener is
// busy processing the first event:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
- assert(queueSize(bus) === 1)
+ assert(sharedQueueSize(bus) === 1)
assert(numDroppedEvents(bus) === 0)
// The queue is now full, so any additional events posted to the listener will be dropped:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
- assert(queueSize(bus) === 1)
+ assert(sharedQueueSize(bus) === 1)
assert(numDroppedEvents(bus) === 1)
// Allow the the remaining events to be processed so we can stop the listener bus:
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
index a1d9085fa085..aa9c36c0aaac 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskContextSuite.scala
@@ -29,6 +29,7 @@ import org.apache.spark.memory.TaskMemoryManager
import org.apache.spark.metrics.source.JvmSource
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.rdd.RDD
+import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util._
class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSparkContext {
@@ -158,6 +159,30 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
assert(attemptIdsWithFailedTask.toSet === Set(0, 1))
}
+ test("TaskContext.stageAttemptNumber getter") {
+ sc = new SparkContext("local[1,2]", "test")
+
+ // Check stageAttemptNumbers are 0 for initial stage
+ val stageAttemptNumbers = sc.parallelize(Seq(1, 2), 2).mapPartitions { _ =>
+ Seq(TaskContext.get().stageAttemptNumber()).iterator
+ }.collect()
+ assert(stageAttemptNumbers.toSet === Set(0))
+
+ // Check stageAttemptNumbers that are resubmitted when tasks have FetchFailedException
+ val stageAttemptNumbersWithFailedStage =
+ sc.parallelize(Seq(1, 2, 3, 4), 4).repartition(1).mapPartitions { _ =>
+ val stageAttemptNumber = TaskContext.get().stageAttemptNumber()
+ if (stageAttemptNumber < 2) {
+ // Throw FetchFailedException to explicitly trigger stage resubmission. A normal exception
+ // will only trigger task resubmission in the same stage.
+ throw new FetchFailedException(null, 0, 0, 0, "Fake")
+ }
+ Seq(stageAttemptNumber).iterator
+ }.collect()
+
+ assert(stageAttemptNumbersWithFailedStage.toSet === Set(2))
+ }
+
test("accumulators are updated on exception failures") {
// This means use 1 core and 4 max task failures
sc = new SparkContext("local[1,4]", "test")
@@ -190,7 +215,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
// accumulator updates from it.
val taskMetrics = TaskMetrics.empty
val task = new Task[Int](0, 0, 0) {
- context = new TaskContextImpl(0, 0, 0L, 0,
+ context = new TaskContextImpl(0, 0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
new Properties,
SparkEnv.get.metricsSystem,
@@ -213,7 +238,7 @@ class TaskContextSuite extends SparkFunSuite with BeforeAndAfter with LocalSpark
// accumulator updates from it.
val taskMetrics = TaskMetrics.registered
val task = new Task[Int](0, 0, 0) {
- context = new TaskContextImpl(0, 0, 0L, 0,
+ context = new TaskContextImpl(0, 0, 0, 0L, 0,
new TaskMemoryManager(SparkEnv.get.memoryManager, 0L),
new Properties,
SparkEnv.get.metricsSystem,
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index 997c7de8dd02..ca66b6b9db89 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -195,7 +195,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val s1Tasks = createTasks(4, execIds)
s1Tasks.foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task))
+ listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId,
+ stages.head.attemptNumber,
+ task))
}
assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size)
@@ -211,55 +213,53 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
s1Tasks.foreach { task =>
check[TaskDataWrapper](task.taskId) { wrapper =>
- assert(wrapper.info.taskId === task.taskId)
+ assert(wrapper.taskId === task.taskId)
assert(wrapper.stageId === stages.head.stageId)
assert(wrapper.stageAttemptId === stages.head.attemptId)
- assert(Arrays.equals(wrapper.stage, Array(stages.head.stageId, stages.head.attemptId)))
-
- val runtime = Array[AnyRef](stages.head.stageId: JInteger, stages.head.attemptId: JInteger,
- -1L: JLong)
- assert(Arrays.equals(wrapper.runtime, runtime))
-
- assert(wrapper.info.index === task.index)
- assert(wrapper.info.attempt === task.attemptNumber)
- assert(wrapper.info.launchTime === new Date(task.launchTime))
- assert(wrapper.info.executorId === task.executorId)
- assert(wrapper.info.host === task.host)
- assert(wrapper.info.status === task.status)
- assert(wrapper.info.taskLocality === task.taskLocality.toString())
- assert(wrapper.info.speculative === task.speculative)
+ assert(wrapper.index === task.index)
+ assert(wrapper.attempt === task.attemptNumber)
+ assert(wrapper.launchTime === task.launchTime)
+ assert(wrapper.executorId === task.executorId)
+ assert(wrapper.host === task.host)
+ assert(wrapper.status === task.status)
+ assert(wrapper.taskLocality === task.taskLocality.toString())
+ assert(wrapper.speculative === task.speculative)
}
}
- // Send executor metrics update. Only update one metric to avoid a lot of boilerplate code.
- s1Tasks.foreach { task =>
- val accum = new AccumulableInfo(1L, Some(InternalAccumulator.MEMORY_BYTES_SPILLED),
- Some(1L), None, true, false, None)
- listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
- task.executorId,
- Seq((task.taskId, stages.head.stageId, stages.head.attemptId, Seq(accum)))))
- }
+ // Send two executor metrics update. Only update one metric to avoid a lot of boilerplate code.
+ // The tasks are distributed among the two executors, so the executor-level metrics should
+ // hold half of the cummulative value of the metric being updated.
+ Seq(1L, 2L).foreach { value =>
+ s1Tasks.foreach { task =>
+ val accum = new AccumulableInfo(1L, Some(InternalAccumulator.MEMORY_BYTES_SPILLED),
+ Some(value), None, true, false, None)
+ listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(
+ task.executorId,
+ Seq((task.taskId, stages.head.stageId, stages.head.attemptNumber, Seq(accum)))))
+ }
- check[StageDataWrapper](key(stages.head)) { stage =>
- assert(stage.info.memoryBytesSpilled === s1Tasks.size)
- }
+ check[StageDataWrapper](key(stages.head)) { stage =>
+ assert(stage.info.memoryBytesSpilled === s1Tasks.size * value)
+ }
- val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
- .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
- assert(execs.size > 0)
- execs.foreach { exec =>
- assert(exec.info.memoryBytesSpilled === s1Tasks.size / 2)
+ val execs = store.view(classOf[ExecutorStageSummaryWrapper]).index("stage")
+ .first(key(stages.head)).last(key(stages.head)).asScala.toSeq
+ assert(execs.size > 0)
+ execs.foreach { exec =>
+ assert(exec.info.memoryBytesSpilled === s1Tasks.size * value / 2)
+ }
}
// Fail one of the tasks, re-start it.
time += 1
s1Tasks.head.markFinished(TaskState.FAILED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
+ listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", TaskResultLost, s1Tasks.head, null))
time += 1
val reattempt = newAttempt(s1Tasks.head, nextTaskId())
- listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
+ listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
reattempt))
assert(store.count(classOf[TaskDataWrapper]) === s1Tasks.size + 1)
@@ -275,13 +275,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
check[TaskDataWrapper](s1Tasks.head.taskId) { task =>
- assert(task.info.status === s1Tasks.head.status)
- assert(task.info.errorMessage == Some(TaskResultLost.toErrorString))
+ assert(task.status === s1Tasks.head.status)
+ assert(task.errorMessage == Some(TaskResultLost.toErrorString))
}
check[TaskDataWrapper](reattempt.taskId) { task =>
- assert(task.info.index === s1Tasks.head.index)
- assert(task.info.attempt === reattempt.attemptNumber)
+ assert(task.index === s1Tasks.head.index)
+ assert(task.attempt === reattempt.attemptNumber)
}
// Kill one task, restart it.
@@ -289,7 +289,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val killed = s1Tasks.drop(1).head
killed.finishTime = time
killed.failed = true
- listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
+ listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", TaskKilled("killed"), killed, null))
check[JobDataWrapper](1) { job =>
@@ -303,21 +303,21 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
check[TaskDataWrapper](killed.taskId) { task =>
- assert(task.info.index === killed.index)
- assert(task.info.errorMessage === Some("killed"))
+ assert(task.index === killed.index)
+ assert(task.errorMessage === Some("killed"))
}
// Start a new attempt and finish it with TaskCommitDenied, make sure it's handled like a kill.
time += 1
val denied = newAttempt(killed, nextTaskId())
val denyReason = TaskCommitDenied(1, 1, 1)
- listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
+ listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
denied))
time += 1
denied.finishTime = time
denied.failed = true
- listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
+ listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", denyReason, denied, null))
check[JobDataWrapper](1) { job =>
@@ -331,13 +331,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
check[TaskDataWrapper](denied.taskId) { task =>
- assert(task.info.index === killed.index)
- assert(task.info.errorMessage === Some(denyReason.toErrorString))
+ assert(task.index === killed.index)
+ assert(task.errorMessage === Some(denyReason.toErrorString))
}
// Start a new attempt.
val reattempt2 = newAttempt(denied, nextTaskId())
- listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId,
+ listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptNumber,
reattempt2))
// Succeed all tasks in stage 1.
@@ -350,7 +350,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
pending.foreach { task =>
task.markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptId,
+ listener.onTaskEnd(SparkListenerTaskEnd(stages.head.stageId, stages.head.attemptNumber,
"taskType", Success, task, s1Metrics))
}
@@ -370,10 +370,10 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
pending.foreach { task =>
check[TaskDataWrapper](task.taskId) { wrapper =>
- assert(wrapper.info.errorMessage === None)
- assert(wrapper.info.taskMetrics.get.executorCpuTime === 2L)
- assert(wrapper.info.taskMetrics.get.executorRunTime === 4L)
- assert(wrapper.info.duration === Some(task.duration))
+ assert(wrapper.errorMessage === None)
+ assert(wrapper.executorCpuTime === 2L)
+ assert(wrapper.executorRunTime === 4L)
+ assert(wrapper.duration === task.duration)
}
}
@@ -414,13 +414,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
val s2Tasks = createTasks(4, execIds)
s2Tasks.foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task))
+ listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId,
+ stages.last.attemptNumber,
+ task))
}
time += 1
s2Tasks.foreach { task =>
task.markFinished(TaskState.FAILED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptId,
+ listener.onTaskEnd(SparkListenerTaskEnd(stages.last.stageId, stages.last.attemptNumber,
"taskType", TaskResultLost, task, null))
}
@@ -455,7 +457,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// - Re-submit stage 2, all tasks, and succeed them and the stage.
val oldS2 = stages.last
- val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptId + 1, oldS2.name, oldS2.numTasks,
+ val newS2 = new StageInfo(oldS2.stageId, oldS2.attemptNumber + 1, oldS2.name, oldS2.numTasks,
oldS2.rddInfos, oldS2.parentIds, oldS2.details, oldS2.taskMetrics)
time += 1
@@ -466,14 +468,14 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val newS2Tasks = createTasks(4, execIds)
newS2Tasks.foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task))
+ listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptNumber, task))
}
time += 1
newS2Tasks.foreach { task =>
task.markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptId, "taskType", Success,
- task, null))
+ listener.onTaskEnd(SparkListenerTaskEnd(newS2.stageId, newS2.attemptNumber, "taskType",
+ Success, task, null))
}
time += 1
@@ -522,14 +524,15 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
val j2s2Tasks = createTasks(4, execIds)
j2s2Tasks.foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId,
+ listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId,
+ j2Stages.last.attemptNumber,
task))
}
time += 1
j2s2Tasks.foreach { task =>
task.markFinished(TaskState.FINISHED, time)
- listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptId,
+ listener.onTaskEnd(SparkListenerTaskEnd(j2Stages.last.stageId, j2Stages.last.attemptNumber,
"taskType", Success, task, null))
}
@@ -888,6 +891,23 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(store.count(classOf[StageDataWrapper]) === 3)
assert(store.count(classOf[RDDOperationGraphWrapper]) === 3)
+ val dropped = stages.drop(1).head
+
+ // Cache some quantiles by calling AppStatusStore.taskSummary(). For quantiles to be
+ // calculcated, we need at least one finished task.
+ time += 1
+ val task = createTasks(1, Array("1")).head
+ listener.onTaskStart(SparkListenerTaskStart(dropped.stageId, dropped.attemptId, task))
+
+ time += 1
+ task.markFinished(TaskState.FINISHED, time)
+ listener.onTaskEnd(SparkListenerTaskEnd(dropped.stageId, dropped.attemptId,
+ "taskType", Success, task, null))
+
+ new AppStatusStore(store)
+ .taskSummary(dropped.stageId, dropped.attemptId, Array(0.25d, 0.50d, 0.75d))
+ assert(store.count(classOf[CachedQuantile], "stage", key(dropped)) === 3)
+
stages.drop(1).foreach { s =>
time += 1
s.completionTime = Some(time)
@@ -899,6 +919,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
intercept[NoSuchElementException] {
store.read(classOf[StageDataWrapper], Array(2, 0))
}
+ assert(store.count(classOf[CachedQuantile], "stage", key(dropped)) === 0)
val attempt2 = new StageInfo(3, 1, "stage3", 4, Nil, Nil, "details3")
time += 1
@@ -919,13 +940,13 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
time += 1
val tasks = createTasks(2, Array("1"))
tasks.foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
+ listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
// Start a 3rd task. The finished tasks should be deleted.
createTasks(1, Array("1")).foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
+ listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
intercept[NoSuchElementException] {
@@ -934,7 +955,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Start a 4th task. The first task should be deleted, even if it's still running.
createTasks(1, Array("1")).foreach { task =>
- listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
+ listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptNumber, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
intercept[NoSuchElementException] {
@@ -960,7 +981,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
- private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
+ private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptNumber)
private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
val value = store.read(classTag[T].runtimeClass, key).asInstanceOf[T]
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
new file mode 100644
index 000000000000..92f90f3d96dd
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.status.api.v1.TaskMetricDistributions
+import org.apache.spark.util.Distribution
+import org.apache.spark.util.kvstore._
+
+class AppStatusStoreSuite extends SparkFunSuite {
+
+ private val uiQuantiles = Array(0.0, 0.25, 0.5, 0.75, 1.0)
+ private val stageId = 1
+ private val attemptId = 1
+
+ test("quantile calculation: 1 task") {
+ compareQuantiles(1, uiQuantiles)
+ }
+
+ test("quantile calculation: few tasks") {
+ compareQuantiles(4, uiQuantiles)
+ }
+
+ test("quantile calculation: more tasks") {
+ compareQuantiles(100, uiQuantiles)
+ }
+
+ test("quantile calculation: lots of tasks") {
+ compareQuantiles(4096, uiQuantiles)
+ }
+
+ test("quantile calculation: custom quantiles") {
+ compareQuantiles(4096, Array(0.01, 0.33, 0.5, 0.42, 0.69, 0.99))
+ }
+
+ test("quantile cache") {
+ val store = new InMemoryStore()
+ (0 until 4096).foreach { i => store.write(newTaskData(i)) }
+
+ val appStore = new AppStatusStore(store)
+
+ appStore.taskSummary(stageId, attemptId, Array(0.13d))
+ intercept[NoSuchElementException] {
+ store.read(classOf[CachedQuantile], Array(stageId, attemptId, "13"))
+ }
+
+ appStore.taskSummary(stageId, attemptId, Array(0.25d))
+ val d1 = store.read(classOf[CachedQuantile], Array(stageId, attemptId, "25"))
+
+ // Add a new task to force the cached quantile to be evicted, and make sure it's updated.
+ store.write(newTaskData(4096))
+ appStore.taskSummary(stageId, attemptId, Array(0.25d, 0.50d, 0.73d))
+
+ val d2 = store.read(classOf[CachedQuantile], Array(stageId, attemptId, "25"))
+ assert(d1.taskCount != d2.taskCount)
+
+ store.read(classOf[CachedQuantile], Array(stageId, attemptId, "50"))
+ intercept[NoSuchElementException] {
+ store.read(classOf[CachedQuantile], Array(stageId, attemptId, "73"))
+ }
+
+ assert(store.count(classOf[CachedQuantile]) === 2)
+ }
+
+ private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
+ val store = new InMemoryStore()
+ val values = (0 until count).map { i =>
+ val task = newTaskData(i)
+ store.write(task)
+ i.toDouble
+ }.toArray
+
+ val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, quantiles).get
+ val dist = new Distribution(values, 0, values.length).getQuantiles(quantiles.sorted)
+
+ dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
+ assert(expected === actual)
+ }
+ }
+
+ private def newTaskData(i: Int): TaskDataWrapper = {
+ new TaskDataWrapper(
+ i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None,
+ i, i, i, i, i, i, i, i, i, i,
+ i, i, i, i, i, i, i, i, i, i,
+ i, i, i, i, stageId, attemptId)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
index 917db766f7f1..9c0699bc981f 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockInfoManagerSuite.scala
@@ -62,7 +62,7 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
private def withTaskId[T](taskAttemptId: Long)(block: => T): T = {
try {
TaskContext.setTaskContext(
- new TaskContextImpl(0, 0, taskAttemptId, 0, null, new Properties, null))
+ new TaskContextImpl(0, 0, 0, taskAttemptId, 0, null, new Properties, null))
block
} finally {
TaskContext.unset()
diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
index 661d0d48d2f3..0aeddf730cd3 100644
--- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark._
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
+import org.apache.spark.status.config._
import org.apache.spark.ui.jobs.{StagePage, StagesTab}
class StagePageSuite extends SparkFunSuite with LocalSparkContext {
@@ -35,15 +36,13 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
private val peakExecutionMemory = 10
test("peak execution memory should displayed") {
- val conf = new SparkConf(false)
- val html = renderStagePage(conf).toString().toLowerCase(Locale.ROOT)
+ val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
val targetString = "peak execution memory"
assert(html.contains(targetString))
}
test("SPARK-10543: peak execution memory should be per-task rather than cumulative") {
- val conf = new SparkConf(false)
- val html = renderStagePage(conf).toString().toLowerCase(Locale.ROOT)
+ val html = renderStagePage().toString().toLowerCase(Locale.ROOT)
// verify min/25/50/75/max show task value not cumulative values
assert(html.contains(s"$peakExecutionMemory.0 b | " * 5))
}
@@ -52,7 +51,8 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
* Render a stage page started with the given conf and return the HTML.
* This also runs a dummy stage to populate the page with useful content.
*/
- private def renderStagePage(conf: SparkConf): Seq[Node] = {
+ private def renderStagePage(): Seq[Node] = {
+ val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
val statusStore = AppStatusStore.createLiveStore(conf)
val listener = statusStore.listener.get
diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh
index c71137468054..a3579f21fc53 100755
--- a/dev/create-release/release-build.sh
+++ b/dev/create-release/release-build.sh
@@ -92,9 +92,9 @@ MVN="build/mvn --force"
# Hive-specific profiles for some builds
HIVE_PROFILES="-Phive -Phive-thriftserver"
# Profiles for publishing snapshots and release to Maven Central
-PUBLISH_PROFILES="-Pmesos -Pyarn -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
+PUBLISH_PROFILES="-Pmesos -Pyarn -Pkubernetes -Pflume $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-asl"
# Profiles for building binary releases
-BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pflume -Psparkr"
+BASE_RELEASE_PROFILES="-Pmesos -Pyarn -Pkubernetes -Pflume -Psparkr"
# Scala 2.11 only profiles for some builds
SCALA_2_11_PROFILES="-Pkafka-0-8"
# Scala 2.12 only profiles for some builds
diff --git a/dev/create-release/releaseutils.py b/dev/create-release/releaseutils.py
index 730138195e5f..32f6cbb29f0b 100755
--- a/dev/create-release/releaseutils.py
+++ b/dev/create-release/releaseutils.py
@@ -185,6 +185,8 @@ def get_commits(tag):
"graphx": "GraphX",
"input/output": CORE_COMPONENT,
"java api": "Java API",
+ "k8s": "Kubernetes",
+ "kubernetes": "Kubernetes",
"mesos": "Mesos",
"ml": "MLlib",
"mllib": "MLlib",
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index a7fce2ede0ea..3b2a88537ca5 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -17,6 +17,7 @@ arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
+automaton-1.11-8.jar
avro-1.7.7.jar
avro-ipc-1.7.7.jar
avro-mapred-1.7.7-hadoop2.jar
@@ -60,6 +61,7 @@ datanucleus-rdbms-3.2.9.jar
derby-10.12.1.1.jar
eigenbase-properties-1.1.5.jar
flatbuffers-1.2.0-3f79e055.jar
+generex-1.0.1.jar
gson-2.2.4.jar
guava-14.0.1.jar
guice-3.0.jar
@@ -91,8 +93,10 @@ jackson-annotations-2.6.7.jar
jackson-core-2.6.7.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.6.7.1.jar
+jackson-dataformat-yaml-2.6.7.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
+jackson-module-jaxb-annotations-2.6.7.jar
jackson-module-paranamer-2.7.9.jar
jackson-module-scala_2.11-2.6.7.1.jar
jackson-xc-1.9.13.jar
@@ -130,10 +134,13 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-3.0.3.jar
+kubernetes-client-3.0.0.jar
+kubernetes-model-2.0.0.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
+logging-interceptor-3.8.1.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
@@ -146,6 +153,8 @@ minlog-1.3.0.jar
netty-3.9.9.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
+okhttp-3.8.1.jar
+okio-1.13.0.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
orc-mapreduce-1.4.1-nohive.jar
@@ -171,6 +180,7 @@ scalap-2.11.8.jar
shapeless_2.11-2.3.2.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
+snakeyaml-1.15.jar
snappy-0.2.jar
snappy-java-1.1.2.6.jar
spire-macros_2.11-0.13.0.jar
@@ -186,5 +196,6 @@ xbean-asm5-shaded-4.4.jar
xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.0.jar
+zjsonpatch-0.3.0.jar
zookeeper-3.4.6.jar
zstd-jni-1.3.2-2.jar
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 94b2e98d85e7..757da00788e5 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -17,6 +17,7 @@ arpack_combined_all-0.1.jar
arrow-format-0.8.0.jar
arrow-memory-0.8.0.jar
arrow-vector-0.8.0.jar
+automaton-1.11-8.jar
avro-1.7.7.jar
avro-ipc-1.7.7.jar
avro-mapred-1.7.7-hadoop2.jar
@@ -60,6 +61,7 @@ datanucleus-rdbms-3.2.9.jar
derby-10.12.1.1.jar
eigenbase-properties-1.1.5.jar
flatbuffers-1.2.0-3f79e055.jar
+generex-1.0.1.jar
gson-2.2.4.jar
guava-14.0.1.jar
guice-3.0.jar
@@ -91,8 +93,10 @@ jackson-annotations-2.6.7.jar
jackson-core-2.6.7.jar
jackson-core-asl-1.9.13.jar
jackson-databind-2.6.7.1.jar
+jackson-dataformat-yaml-2.6.7.jar
jackson-jaxrs-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
+jackson-module-jaxb-annotations-2.6.7.jar
jackson-module-paranamer-2.7.9.jar
jackson-module-scala_2.11-2.6.7.1.jar
jackson-xc-1.9.13.jar
@@ -131,10 +135,13 @@ jta-1.1.jar
jtransforms-2.4.0.jar
jul-to-slf4j-1.7.16.jar
kryo-shaded-3.0.3.jar
+kubernetes-client-3.0.0.jar
+kubernetes-model-2.0.0.jar
leveldbjni-all-1.8.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
log4j-1.2.17.jar
+logging-interceptor-3.8.1.jar
lz4-java-1.4.0.jar
machinist_2.11-0.6.1.jar
macro-compat_2.11-1.1.1.jar
@@ -147,6 +154,8 @@ minlog-1.3.0.jar
netty-3.9.9.Final.jar
netty-all-4.1.17.Final.jar
objenesis-2.1.jar
+okhttp-3.8.1.jar
+okio-1.13.0.jar
opencsv-2.3.jar
orc-core-1.4.1-nohive.jar
orc-mapreduce-1.4.1-nohive.jar
@@ -172,6 +181,7 @@ scalap-2.11.8.jar
shapeless_2.11-2.3.2.jar
slf4j-api-1.7.16.jar
slf4j-log4j12-1.7.16.jar
+snakeyaml-1.15.jar
snappy-0.2.jar
snappy-java-1.1.2.6.jar
spire-macros_2.11-0.13.0.jar
@@ -187,5 +197,6 @@ xbean-asm5-shaded-4.4.jar
xercesImpl-2.9.1.jar
xmlenc-0.52.jar
xz-1.0.jar
+zjsonpatch-0.3.0.jar
zookeeper-3.4.6.jar
zstd-jni-1.3.2-2.jar
diff --git a/dev/lint-java b/dev/lint-java
index c2e80538ef2a..1f0b0c8379ed 100755
--- a/dev/lint-java
+++ b/dev/lint-java
@@ -20,7 +20,7 @@
SCRIPT_DIR="$( cd "$( dirname "$0" )" && pwd )"
SPARK_ROOT_DIR="$(dirname $SCRIPT_DIR)"
-ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pmesos -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)
+ERRORS=$($SCRIPT_DIR/../build/mvn -Pkinesis-asl -Pmesos -Pkubernetes -Pyarn -Phive -Phive-thriftserver checkstyle:check | grep ERROR)
if test ! -z "$ERRORS"; then
echo -e "Checkstyle checks failed at following occurrences:\n$ERRORS"
diff --git a/dev/mima b/dev/mima
index 1e3ca9700bc0..cd2694ff4d3d 100755
--- a/dev/mima
+++ b/dev/mima
@@ -24,7 +24,7 @@ set -e
FWDIR="$(cd "`dirname "$0"`"/..; pwd)"
cd "$FWDIR"
-SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
+SPARK_PROFILES="-Pmesos -Pkafka-0-8 -Pkubernetes -Pyarn -Pflume -Pspark-ganglia-lgpl -Pkinesis-asl -Phive-thriftserver -Phive"
TOOLS_CLASSPATH="$(build/sbt -DcopyDependencies=false "export tools/fullClasspath" | tail -n1)"
OLD_DEPS_CLASSPATH="$(build/sbt -DcopyDependencies=false $SPARK_PROFILES "export oldDeps/fullClasspath" | tail -n1)"
diff --git a/dev/scalastyle b/dev/scalastyle
index 89ecc8abd6f8..b8053df05fa2 100755
--- a/dev/scalastyle
+++ b/dev/scalastyle
@@ -24,6 +24,7 @@ ERRORS=$(echo -e "q\n" \
-Pkinesis-asl \
-Pmesos \
-Pkafka-0-8 \
+ -Pkubernetes \
-Pyarn \
-Pflume \
-Phive \
diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index f834563da9dd..b900f0bd913c 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -400,6 +400,7 @@ def __hash__(self):
"pyspark.sql.functions",
"pyspark.sql.readwriter",
"pyspark.sql.streaming",
+ "pyspark.sql.udf",
"pyspark.sql.window",
"pyspark.sql.tests",
]
@@ -539,7 +540,7 @@ def __hash__(self):
kubernetes = Module(
name="kubernetes",
dependencies=[],
- source_file_regexes=["resource-managers/kubernetes/core"],
+ source_file_regexes=["resource-managers/kubernetes"],
build_profile_flags=["-Pkubernetes"],
sbt_test_goals=["kubernetes/test"]
)
diff --git a/dev/test-dependencies.sh b/dev/test-dependencies.sh
index 58b295d4f6e0..3bf7618e1ea9 100755
--- a/dev/test-dependencies.sh
+++ b/dev/test-dependencies.sh
@@ -29,7 +29,7 @@ export LC_ALL=C
# TODO: This would be much nicer to do in SBT, once SBT supports Maven-style resolution.
# NOTE: These should match those in the release publishing script
-HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pyarn -Pflume -Phive"
+HADOOP2_MODULE_PROFILES="-Phive-thriftserver -Pmesos -Pkafka-0-8 -Pkubernetes -Pyarn -Pflume -Phive"
MVN="build/mvn"
HADOOP_PROFILES=(
hadoop-2.6
diff --git a/docs/_config.yml b/docs/_config.yml
index dcc211204d76..213579037f51 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -14,8 +14,8 @@ include:
# These allow the documentation to be updated with newer releases
# of Spark, Scala, and Mesos.
-SPARK_VERSION: 2.3.0-SNAPSHOT
-SPARK_VERSION_SHORT: 2.3.0
+SPARK_VERSION: 2.3.1-SNAPSHOT
+SPARK_VERSION_SHORT: 2.3.1
SCALA_BINARY_VERSION: "2.11"
SCALA_VERSION: "2.11.8"
MESOS_VERSION: 1.0.0
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index 658e67f99dd7..7277e2fb2731 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -52,8 +52,8 @@ The system currently supports three cluster managers:
* [Apache Mesos](running-on-mesos.html) -- a general cluster manager that can also run Hadoop MapReduce
and service applications.
* [Hadoop YARN](running-on-yarn.html) -- the resource manager in Hadoop 2.
-* [Kubernetes](running-on-kubernetes.html) -- [Kubernetes](https://kubernetes.io/docs/concepts/overview/what-is-kubernetes/)
-is an open-source platform that provides container-centric infrastructure.
+* [Kubernetes](running-on-kubernetes.html) -- an open-source system for automating deployment, scaling,
+ and management of containerized applications.
A third-party project (not supported by the Spark project) exists to add support for
[Nomad](https://github.com/hashicorp/nomad-spark) as a cluster manager.
diff --git a/docs/configuration.md b/docs/configuration.md
index 1189aea2aa71..eecb39dcafc9 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -58,6 +58,10 @@ The following format is accepted:
1t or 1tb (tebibytes = 1024 gibibytes)
1p or 1pb (pebibytes = 1024 tebibytes)
+While numbers without units are generally interpreted as bytes, a few are interpreted as KiB or MiB.
+See documentation of individual configuration properties. Specifying units is desirable where
+possible.
+
## Dynamically Loading Spark Properties
In some cases, you may want to avoid hard-coding certain configurations in a `SparkConf`. For
@@ -136,9 +140,9 @@ of the most common options to set are:
spark.driver.maxResultSize |
1g |
- Limit of total size of serialized results of all partitions for each Spark action (e.g. collect).
- Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total size
- is above this limit.
+ Limit of total size of serialized results of all partitions for each Spark action (e.g.
+ collect) in bytes. Should be at least 1M, or 0 for unlimited. Jobs will be aborted if the total
+ size is above this limit.
Having a high limit may cause out-of-memory errors in driver (depends on spark.driver.memory
and memory overhead of objects in JVM). Setting a proper limit can protect the driver from
out-of-memory errors.
@@ -148,10 +152,10 @@ of the most common options to set are:
| spark.driver.memory |
1g |
- Amount of memory to use for the driver process, i.e. where SparkContext is initialized.
- (e.g. 1g, 2g).
-
- Note: In client mode, this config must not be set through the SparkConf
+ Amount of memory to use for the driver process, i.e. where SparkContext is initialized, in MiB
+ unless otherwise specified (e.g. 1g, 2g).
+
+ Note: In client mode, this config must not be set through the SparkConf
directly in your application, because the driver JVM has already started at that point.
Instead, please set this through the --driver-memory command line option
or in your default properties file.
@@ -161,27 +165,28 @@ of the most common options to set are:
| spark.driver.memoryOverhead |
driverMemory * 0.10, with minimum of 384 |
- The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is
- memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
- This tends to grow with the container size (typically 6-10%). This option is currently supported
- on YARN and Kubernetes.
+ The amount of off-heap memory to be allocated per driver in cluster mode, in MiB unless
+ otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
+ other native overheads, etc. This tends to grow with the container size (typically 6-10%).
+ This option is currently supported on YARN and Kubernetes.
|
|