-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-3454] Expose JSON representation of data shown in WebU #2333
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
353a423
b8578a7
937c8b7
437c241
5556856
f7958b0
9e0010a
8850706
e537be6
c108200
2f8f9f3
a349d0e
eb49ea5
d4d8c22
36ce0ed
1882f38
6b159ed
270346a
f1b6bcf
72c0644
a2dbe2e
b50a383
7f51a4f
d41b3ca
7b4d6eb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,13 +19,33 @@ package org.apache.spark.ui.env | |
|
|
||
| import javax.servlet.http.HttpServletRequest | ||
|
|
||
| import org.json4s.{JObject, JValue} | ||
| import org.json4s.JsonDSL._ | ||
|
|
||
| import scala.xml.Node | ||
|
|
||
| import org.apache.spark.ui.{UIUtils, WebUIPage} | ||
|
|
||
| private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { | ||
| private val listener = parent.listener | ||
|
|
||
| override def renderJson(request: HttpServletRequest): JValue = { | ||
| val jvmInfoJson = | ||
| ("RUntime Informationf" -> listener.jvmInformation.foldLeft(JObject())(_ ~ _)) | ||
| val sparkPropertiesJson = | ||
| ("Spark Properties" -> listener.sparkProperties.foldLeft(JObject())(_ ~ _)) | ||
| val systemPropertiesJson = | ||
| ("System Properties" -> listener.systemProperties.foldLeft(JObject())(_ ~ _)) | ||
| val classPathEntriesJson = | ||
| ("Classpath Entries" -> listener.classpathEntries.foldLeft(JObject())(_ ~ _)) | ||
|
|
||
| val environment = ("Environment" -> jvmInfoJson ~ | ||
| sparkPropertiesJson ~ | ||
| systemPropertiesJson ~ | ||
| classPathEntriesJson) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should this just return the raw |
||
| environment | ||
| } | ||
|
|
||
| def render(request: HttpServletRequest): Seq[Node] = { | ||
| val runtimeInformationTable = UIUtils.listingTable( | ||
| propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,9 @@ import javax.servlet.http.HttpServletRequest | |
|
|
||
| import scala.xml.Node | ||
|
|
||
| import org.json4s.JValue | ||
| import org.json4s.JsonDSL._ | ||
|
|
||
| import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -44,6 +47,30 @@ private case class ExecutorSummaryInfo( | |
| private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { | ||
| private val listener = parent.listener | ||
|
|
||
| override def renderJson(request: HttpServletRequest): JValue = { | ||
| val storageStatusList = listener.storageStatusList | ||
|
|
||
| val execInfoJsonList = for (statusId <- 0 until storageStatusList.size) yield { | ||
| val execInfo = getExecInfo(statusId) | ||
| ("Executor ID" -> execInfo.id) ~ | ||
| ("Address" -> execInfo.hostPort) ~ | ||
| ("RDD Blocks" -> execInfo.rddBlocks) ~ | ||
| ("Memory Used" -> execInfo.memoryUsed) ~ | ||
| ("Disk Used" -> execInfo.diskUsed) ~ | ||
| ("Active Tasks" -> execInfo.activeTasks) ~ | ||
| ("Failed Tasks" -> execInfo.failedTasks) ~ | ||
| ("Complete Tasks" -> execInfo.completedTasks) ~ | ||
| ("TotalTasks" -> execInfo.totalTasks) ~ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. space |
||
| ("Task Time" -> execInfo.totalDuration) ~ | ||
| ("Input" -> execInfo.totalInputBytes) ~ | ||
| ("Shuffle Read" -> execInfo.totalShuffleRead) ~ | ||
| ("Shuffle Write" -> execInfo.totalShuffleWrite) | ||
| } | ||
|
|
||
| ("Executor List" -> execInfoJsonList) | ||
|
|
||
| } | ||
|
|
||
| def render(request: HttpServletRequest): Seq[Node] = { | ||
| val storageStatusList = listener.storageStatusList | ||
| val maxMem = storageStatusList.map(_.maxMem).sum | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,9 +19,14 @@ package org.apache.spark.ui.jobs | |
|
|
||
| import javax.servlet.http.HttpServletRequest | ||
|
|
||
| import org.apache.spark.util.JsonProtocol | ||
| import org.json4s.JValue | ||
| import org.json4s.JsonAST.JNothing | ||
| import org.json4s.JsonDSL._ | ||
|
|
||
| import scala.xml.{Node, NodeSeq} | ||
|
|
||
| import org.apache.spark.scheduler.Schedulable | ||
| import org.apache.spark.scheduler.{StageInfo, Schedulable} | ||
| import org.apache.spark.ui.{WebUIPage, UIUtils} | ||
|
|
||
| /** Page showing list of all ongoing and recently finished stages and pools */ | ||
|
|
@@ -31,6 +36,35 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") | |
| private val listener = parent.listener | ||
| private lazy val isFairScheduler = parent.isFairScheduler | ||
|
|
||
| override def renderJson(request: HttpServletRequest): JValue = { | ||
| listener.synchronized { | ||
|
|
||
| val activeStageList = listener.activeStages.values.map { | ||
| case info: StageInfo => | ||
| JsonProtocol.stageInfoToJson(info) | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: just do
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. here and other places |
||
| val activeStageJson = ("Active Stages" -> activeStageList) | ||
|
|
||
| val completedStageList = listener.completedStages.reverse.map { | ||
| case info: StageInfo => | ||
| JsonProtocol.stageInfoToJson(info) | ||
| } | ||
| val completedStageJson = ("Completed Stages" -> completedStageList) | ||
|
|
||
| val failedStageList = listener.failedStages.reverse.map { | ||
| case info: StageInfo => | ||
| JsonProtocol.stageInfoToJson(info) | ||
| } | ||
| val failedStageJson = ("Failed Stages" -> failedStageList) | ||
|
|
||
| ("Stages Info" -> | ||
| ("Scheduling Mode" -> listener.schedulingMode.map(_.toString).getOrElse("Unknown")) ~ | ||
|
||
| activeStageJson ~ | ||
| completedStageJson ~ | ||
| failedStageJson) | ||
| } | ||
| } | ||
|
|
||
| def render(request: HttpServletRequest): Seq[Node] = { | ||
| listener.synchronized { | ||
| val activeStages = listener.activeStages.values.toSeq | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,8 +19,13 @@ package org.apache.spark.ui.jobs | |
|
|
||
| import javax.servlet.http.HttpServletRequest | ||
|
|
||
| import org.apache.spark.util.JsonProtocol | ||
|
|
||
| import scala.xml.Node | ||
|
|
||
| import org.json4s.JValue | ||
| import org.json4s.JsonDSL._ | ||
|
|
||
| import org.apache.spark.scheduler.{Schedulable, StageInfo} | ||
| import org.apache.spark.ui.{WebUIPage, UIUtils} | ||
|
|
||
|
|
@@ -30,6 +35,35 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { | |
| private val sc = parent.sc | ||
| private val listener = parent.listener | ||
|
|
||
| override def renderJson(request: HttpServletRequest): JValue = { | ||
| listener.synchronized { | ||
| val poolName = request.getParameter("poolname") | ||
| val poolToActiveStages = listener.poolToActiveStages | ||
| val activeStages = poolToActiveStages.get(poolName) match { | ||
| case Some(s) => s.values.map { | ||
| case info: StageInfo => | ||
| JsonProtocol.stageInfoToJson(info) | ||
| } | ||
| case None => Seq[JValue]() | ||
| } | ||
|
|
||
| val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]() | ||
|
|
||
| val poolList = pools.map { | ||
| case schedulable: Schedulable => | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here, no need to do |
||
| ("Pool Name" -> schedulable.name) ~ | ||
| ("Minimum Share" -> schedulable.minShare) ~ | ||
| ("Pool Weight" -> schedulable.weight) ~ | ||
| ("Active Stages" -> activeStages) ~ | ||
| ("Running Tasks" -> schedulable.runningTasks) ~ | ||
| ("Scheduling Mode" -> schedulable.schedulingMode.toString) | ||
| } | ||
|
|
||
| ("Pools" -> poolList) | ||
|
|
||
| } | ||
| } | ||
|
|
||
| def render(request: HttpServletRequest): Seq[Node] = { | ||
| listener.synchronized { | ||
| val poolName = request.getParameter("poolname") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,17 +20,76 @@ package org.apache.spark.ui.jobs | |
| import java.util.Date | ||
| import javax.servlet.http.HttpServletRequest | ||
|
|
||
| import org.json4s.JsonAST.JNothing | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor, group this import with other |
||
|
|
||
| import scala.xml.{Node, Unparsed} | ||
|
|
||
| import org.json4s.{JObject, JValue} | ||
| import org.json4s.JsonDSL._ | ||
|
|
||
| import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils} | ||
| import org.apache.spark.ui.jobs.UIData._ | ||
| import org.apache.spark.util.{Utils, Distribution} | ||
| import org.apache.spark.util.{JsonProtocol, Utils, Distribution} | ||
| import org.apache.spark.scheduler.AccumulableInfo | ||
|
|
||
| /** Page showing statistics and task list for a given stage */ | ||
| private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { | ||
| private val listener = parent.listener | ||
|
|
||
| override def renderJson(request: HttpServletRequest): JValue = { | ||
| val stageId = request.getParameter("id").toInt | ||
| val stageAttemptId = request.getParameter("attempt").toInt | ||
|
|
||
| var stageSummary = ("Stage ID" -> stageId) ~ ("Stage Attempt ID" -> stageAttemptId) | ||
|
|
||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. extra new line |
||
|
|
||
| val stageDataOpt = listener.stageIdToData.get((stageId, stageAttemptId)) | ||
| var retVal: JValue = JNothing | ||
|
|
||
| if (!stageDataOpt.isEmpty && !stageDataOpt.get.taskData.isEmpty) { | ||
| val stageData = stageDataOpt.get | ||
|
|
||
| stageSummary ~= ("Executor Run Time" -> stageData.executorRunTime) | ||
| if (stageData.inputBytes > 0) stageSummary ~= ("Input Bytes" -> stageData.inputBytes) | ||
| if (stageData.shuffleReadBytes > 0) { | ||
| stageSummary ~= ("Shuffle Read Bytes" -> stageData.shuffleReadBytes) | ||
| } | ||
|
|
||
| if (stageData.shuffleWriteBytes > 0) { | ||
| stageSummary ~= ("Shuffle Write bytes" -> stageData.shuffleWriteBytes) | ||
| } | ||
|
|
||
| if (stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0) { | ||
| stageSummary ~= | ||
| ("Memory Bytes Spilled" -> stageData.memoryBytesSpilled) ~ | ||
| ("Disk Bytes Spilled" -> stageData.diskBytesSpilled) | ||
| } | ||
|
|
||
| val tasks = stageData.taskData.values.toSeq.sortBy(_.taskInfo.launchTime) | ||
|
|
||
| val taskList = tasks.map { | ||
| case uiData: TaskUIData => | ||
| var jsonTaskInfo: JValue = JsonProtocol.taskInfoToJson(uiData.taskInfo) | ||
| val jsonTaskMetrics: JValue = | ||
| if (uiData.taskMetrics.isDefined) { | ||
| JsonProtocol.taskMetricsToJson(uiData.taskMetrics.get) | ||
| } else JNothing | ||
|
|
||
| if (jsonTaskInfo.isInstanceOf[JObject] && jsonTaskMetrics.isInstanceOf[JObject]) { | ||
| jsonTaskInfo = | ||
| jsonTaskInfo.asInstanceOf[JObject] ~ jsonTaskMetrics.asInstanceOf[JObject] | ||
| } | ||
| jsonTaskInfo | ||
| } | ||
|
|
||
| retVal = | ||
| ("Stage Info" -> | ||
| ("StageSummary" -> stageSummary) ~ | ||
| ("Tasks" -> taskList)) | ||
| } | ||
| retVal | ||
| } | ||
|
|
||
| def render(request: HttpServletRequest): Seq[Node] = { | ||
| listener.synchronized { | ||
| val stageId = request.getParameter("id").toInt | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a lot of typos in this line...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you show what the final JSON looks like? Since this is a map it might be good to convert this faithfully, e.g.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the result of JSON for environment.
}