Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 5 additions & 18 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllJobsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
val someJobHasJobGroup = jobs.exists(_.jobGroup.isDefined)
val jobIdTitle = if (someJobHasJobGroup) "Job Id (Job Group)" else "Job Id"
val jobPage = Option(request.getParameter(jobTag + ".page")).map(_.toInt).getOrElse(1)
val currentTime = System.currentTimeMillis()

try {
new JobPagedTable(
Expand All @@ -226,7 +225,6 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
UIUtils.prependBaseUri(request, parent.basePath),
"jobs", // subPath
killEnabled,
currentTime,
jobIdTitle
).table(jobPage)
} catch {
Expand Down Expand Up @@ -399,7 +397,6 @@ private[ui] class JobDataSource(
store: AppStatusStore,
jobs: Seq[v1.JobData],
basePath: String,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean) extends PagedDataSource[JobTableRowData](pageSize) {
Expand All @@ -410,15 +407,9 @@ private[ui] class JobDataSource(
// so that we can avoid creating duplicate contents during sorting the data
private val data = jobs.map(jobRow).sorted(ordering(sortColumn, desc))

private var _slicedJobIds: Set[Int] = null

override def dataSize: Int = data.size

override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = {
val r = data.slice(from, to)
_slicedJobIds = r.map(_.jobData.jobId).toSet
r
}
override def sliceData(from: Int, to: Int): Seq[JobTableRowData] = data.slice(from, to)

private def jobRow(jobData: v1.JobData): JobTableRowData = {
val duration: Option[Long] = JobDataUtil.getDuration(jobData)
Expand Down Expand Up @@ -479,17 +470,17 @@ private[ui] class JobPagedTable(
basePath: String,
subPath: String,
killEnabled: Boolean,
currentTime: Long,
jobIdTitle: String
) extends PagedTable[JobTableRowData] {

private val (sortColumn, desc, pageSize) = getTableParameters(request, jobTag, jobIdTitle)
private val parameterPath = basePath + s"/$subPath/?" + getParameterOtherTable(request, jobTag)
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

override def tableId: String = jobTag + "-table"

override def tableCssClass: String =
"table table-bordered table-sm table-striped " +
"table-head-clickable table-cell-width-limited"
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"

override def pageSizeFormField: String = jobTag + ".pageSize"

Expand All @@ -499,13 +490,11 @@ private[ui] class JobPagedTable(
store,
data,
basePath,
currentTime,
pageSize,
sortColumn,
desc)

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$jobTag.sort=$encodedSortColumn" +
Expand All @@ -514,10 +503,8 @@ private[ui] class JobPagedTable(
s"#$tableHeaderId"
}

override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
override def goButtonFormPath: String =
s"$parameterPath&$jobTag.sort=$encodedSortColumn&$jobTag.desc=$desc#$tableHeaderId"
}

override def headers: Seq[Node] = {
// Information for each header: title, sortable, tooltip
Expand Down
14 changes: 3 additions & 11 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
stageData,
UIUtils.prependBaseUri(request, parent.basePath) +
s"/stages/stage/?id=${stageId}&attempt=${stageAttemptId}",
currentTime,
pageSize = taskPageSize,
sortColumn = taskSortColumn,
desc = taskSortDesc,
Expand Down Expand Up @@ -452,7 +451,6 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We

private[ui] class TaskDataSource(
stage: StageData,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean,
Expand All @@ -474,8 +472,6 @@ private[ui] class TaskDataSource(
_tasksToShow
}

def tasks: Seq[TaskData] = _tasksToShow

def executorLogs(id: String): Map[String, String] = {
executorIdToLogs.getOrElseUpdate(id,
store.asOption(store.executorSummary(id)).map(_.executorLogs).getOrElse(Map.empty))
Expand All @@ -486,14 +482,15 @@ private[ui] class TaskDataSource(
private[ui] class TaskPagedTable(
stage: StageData,
basePath: String,
currentTime: Long,
pageSize: Int,
sortColumn: String,
desc: Boolean,
store: AppStatusStore) extends PagedTable[TaskData] {

import ApiHelper._

private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

override def tableId: String = "task-table"

override def tableCssClass: String =
Expand All @@ -505,25 +502,20 @@ private[ui] class TaskPagedTable(

override val dataSource: TaskDataSource = new TaskDataSource(
stage,
currentTime,
pageSize,
sortColumn,
desc,
store)

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
basePath +
s"&$pageNumberFormField=$page" +
s"&task.sort=$encodedSortColumn" +
s"&task.desc=$desc" +
s"&$pageSizeFormField=$pageSize"
}

override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc"
}
override def goButtonFormPath: String = s"$basePath&task.sort=$encodedSortColumn&task.desc=$desc"

def headers: Seq[Node] = {
import ApiHelper._
Expand Down
21 changes: 6 additions & 15 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,17 @@ private[ui] class StagePagedTable(
override def tableId: String = stageTag + "-table"

override def tableCssClass: String =
"table table-bordered table-sm table-striped " +
"table-head-clickable table-cell-width-limited"
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"

override def pageSizeFormField: String = stageTag + ".pageSize"

override def pageNumberFormField: String = stageTag + ".page"

private val (sortColumn, desc, pageSize) = getTableParameters(request, stageTag, "Stage Id")

val parameterPath = UIUtils.prependBaseUri(request, basePath) + s"/$subPath/?" +
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())

private val parameterPath = UIUtils.prependBaseUri(request, basePath) + s"/$subPath/?" +
getParameterOtherTable(request, stageTag)

override val dataSource = new StageDataSource(
Expand All @@ -138,7 +139,6 @@ private[ui] class StagePagedTable(
)

override def pageLink(page: Int): String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
parameterPath +
s"&$pageNumberFormField=$page" +
s"&$stageTag.sort=$encodedSortColumn" +
Expand All @@ -147,10 +147,8 @@ private[ui] class StagePagedTable(
s"#$tableHeaderId"
}

override def goButtonFormPath: String = {
val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
override def goButtonFormPath: String =
s"$parameterPath&$stageTag.sort=$encodedSortColumn&$stageTag.desc=$desc#$tableHeaderId"
}

override def headers: Seq[Node] = {
// stageHeadersAndCssClasses has three parts: header title, sortable and tooltip information.
Expand Down Expand Up @@ -311,15 +309,9 @@ private[ui] class StageDataSource(
// table so that we can avoid creating duplicate contents during sorting the data
private val data = stages.map(stageRow).sorted(ordering(sortColumn, desc))

private var _slicedStageIds: Set[Int] = _

override def dataSize: Int = data.size

override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = {
val r = data.slice(from, to)
_slicedStageIds = r.map(_.stageId).toSet
r
}
override def sliceData(from: Int, to: Int): Seq[StageTableRowData] = data.slice(from, to)

private def stageRow(stageData: v1.StageData): StageTableRowData = {
val formattedSubmissionTime = stageData.submissionTime match {
Expand Down Expand Up @@ -350,7 +342,6 @@ private[ui] class StageDataSource(
val shuffleWrite = stageData.shuffleWriteBytes
val shuffleWriteWithUnit = if (shuffleWrite > 0) Utils.bytesToString(shuffleWrite) else ""


new StageTableRowData(
stageData,
Some(stageData),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,7 @@ private[spark] object JsonProtocol {
("Callsite" -> rddInfo.callSite) ~
("Parent IDs" -> parentIds) ~
("Storage Level" -> storageLevel) ~
("Barrier" -> rddInfo.isBarrier) ~
("Number of Partitions" -> rddInfo.numPartitions) ~
("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
("Memory Size" -> rddInfo.memSize) ~
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar._

import org.apache.spark._
import org.apache.spark.internal.config
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY

class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with Eventually {
Expand All @@ -37,10 +38,10 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
.setAppName("test-cluster")
.set(TEST_NO_STAGE_RETRY, true)
sc = new SparkContext(conf)
TestUtils.waitUntilExecutorsUp(sc, numWorker, 60000)
}

// TODO (SPARK-31730): re-enable it
ignore("global sync by barrier() call") {
test("global sync by barrier() call") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand All @@ -57,10 +58,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("share messages with allGather() call") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -78,10 +76,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("throw exception if we attempt to synchronize with different blocking calls") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -100,10 +95,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
}

test("successively sync with allGather and barrier") {
val conf = new SparkConf()
.setMaster("local-cluster[4, 1, 1024]")
.setAppName("test-cluster")
sc = new SparkContext(conf)
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
val context = BarrierTaskContext.get()
Expand All @@ -129,8 +121,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with
assert(times2.max - times2.min <= 1000)
}

// TODO (SPARK-31730): re-enable it
ignore("support multiple barrier() call within a single task") {
test("support multiple barrier() call within a single task") {
initLocalClusterSparkContext()
val rdd = sc.makeRDD(1 to 10, 4)
val rdd2 = rdd.barrier().mapPartitions { it =>
Expand Down Expand Up @@ -285,6 +276,9 @@ class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext with

test("SPARK-31485: barrier stage should fail if only partial tasks are launched") {
initLocalClusterSparkContext(2)
// It's required to reset the delay timer when a task is scheduled, otherwise all the tasks
// could get scheduled at ANY level.
sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
val rdd0 = sc.parallelize(Seq(0, 1, 2, 3), 2)
val dep = new OneToOneDependency[Int](rdd0)
// set up a barrier stage with 2 tasks and both tasks prefer executor 0 (only 1 core) for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.deploy.history.{EventLogFileReader, SingleEventLogFileWr
import org.apache.spark.deploy.history.EventLogTestHelper._
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{EVENT_LOG_DIR, EVENT_LOG_ENABLED}
import org.apache.spark.io._
import org.apache.spark.metrics.{ExecutorMetricType, MetricsSystem}
import org.apache.spark.resource.ResourceProfile
Expand Down Expand Up @@ -100,6 +101,49 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
testStageExecutorMetricsEventLogging()
}

test("SPARK-31764: isBarrier should be logged in event log") {
val conf = new SparkConf()
conf.set(EVENT_LOG_ENABLED, true)
conf.set(EVENT_LOG_DIR, testDirPath.toString)
val sc = new SparkContext("local", "test-SPARK-31764", conf)
val appId = sc.applicationId

sc.parallelize(1 to 10)
.barrier()
.mapPartitions(_.map(elem => (elem, elem)))
.filter(elem => elem._1 % 2 == 0)
.reduceByKey(_ + _)
.collect
sc.stop()

val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
val jobStartEvents = events
.filter(event => event.isInstanceOf[SparkListenerJobStart])
.map(_.asInstanceOf[SparkListenerJobStart])

assert(jobStartEvents.size === 1)
val stageInfos = jobStartEvents.head.stageInfos
assert(stageInfos.size === 2)

val stage0 = stageInfos(0)
val rddInfosInStage0 = stage0.rddInfos
assert(rddInfosInStage0.size === 3)
val sortedRddInfosInStage0 = rddInfosInStage0.sortBy(_.scope.get.name)
assert(sortedRddInfosInStage0(0).scope.get.name === "filter")
assert(sortedRddInfosInStage0(0).isBarrier === true)
assert(sortedRddInfosInStage0(1).scope.get.name === "mapPartitions")
assert(sortedRddInfosInStage0(1).isBarrier === true)
assert(sortedRddInfosInStage0(2).scope.get.name === "parallelize")
assert(sortedRddInfosInStage0(2).isBarrier === false)

val stage1 = stageInfos(1)
val rddInfosInStage1 = stage1.rddInfos
assert(rddInfosInStage1.size === 1)
assert(rddInfosInStage1(0).scope.get.name === "reduceByKey")
assert(rddInfosInStage1(0).isBarrier === false) // reduceByKey
}

/* ----------------- *
* Actual test logic *
* ----------------- */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext {
val taskTable = new TaskPagedTable(
stageData,
basePath = "/a/b/c",
currentTime = 0,
pageSize = 10,
sortColumn = "Index",
desc = false,
Expand Down
Loading