Skip to content

Commit ab91416

Browse files
author
Andrew Or
committed
Introduce visualization to the Job Page
This includes a generalization of the visualization previously displayed on the stage page. More functionality is needed in JavaScript to prevent the job visualization from looking too cluttered. This is still WIP.
1 parent 5f07e9c commit ab91416

File tree

9 files changed

+223
-160
lines changed

9 files changed

+223
-160
lines changed

core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js

Lines changed: 5 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/main/resources/org/apache/spark/ui/static/spark-stage-viz.js renamed to core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 94 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -15,84 +15,70 @@
1515
* limitations under the License.
1616
*/
1717

18-
var stageVizIsRendered = false
18+
var VizDefaults = {
19+
stageVizOffset: 160
20+
};
1921

2022
/*
21-
* Render or remove the stage visualization on the UI.
23+
* Show or hide the RDD DAG visualization on the UI.
2224
* This assumes that the visualization is stored in the "#viz-graph" element.
2325
*/
24-
function toggleStageViz() {
26+
function toggleShowViz(forJob) {
2527
$(".expand-visualization-arrow").toggleClass('arrow-closed');
2628
$(".expand-visualization-arrow").toggleClass('arrow-open');
27-
var shouldRender = $(".expand-visualization-arrow").hasClass("arrow-open");
28-
if (shouldRender) {
29-
// If the viz is already rendered, just show it
30-
if (stageVizIsRendered) {
31-
$("#viz-graph").show();
32-
} else {
33-
renderStageViz();
34-
stageVizIsRendered = true;
29+
var show = $(".expand-visualization-arrow").hasClass("arrow-open");
30+
if (show) {
31+
var shouldRender = d3.select("#viz-graph svg").empty();
32+
if (shouldRender) {
33+
renderViz();
34+
styleViz(forJob);
3535
}
36+
$("#viz-graph").show();
3637
} else {
37-
// Instead of emptying the element once and for all, cache it for use
38-
// again later in case we want to expand the visualization again
3938
$("#viz-graph").hide();
4039
}
4140
}
4241

4342
/*
44-
* Render a DAG that describes the RDDs for a given stage.
43+
* Render a DAG visualization that describes RDDs in a stage or a job.
4544
*
46-
* Input: The content of a dot file, stored in the text of the "#viz-dot-file" element
45+
* Input: The contents of the relevant dot files, stored in the "#viz-dot-files" element
4746
* Output: An SVG that visualizes the DAG, stored in the "#viz-graph" element
4847
*
4948
* This relies on a custom implementation of dagre-d3, which can be found under
5049
* https://github.com/andrewor14/dagre-d3/dist/dagre-d3.js. For more detail, please
5150
* track the changes in that project after it was forked.
5251
*/
53-
function renderStageViz() {
52+
function renderViz() {
5453

5554
// If there is not a dot file to render, report error
56-
if (d3.select("#viz-dot-file").empty()) {
55+
if (d3.select("#viz-dot-files").empty()) {
5756
d3.select("#viz-graph")
5857
.append("div")
5958
.text("No visualization information available for this stage.");
6059
return;
6160
}
6261

63-
// Parse the dot file and render it in an SVG
64-
var dot = d3.select("#viz-dot-file").text();
65-
var escaped_dot = dot
66-
.replace(/&lt;/g, "<")
67-
.replace(/&gt;/g, ">")
68-
.replace(/&quot;/g, "\"");
69-
var g = graphlibDot.read(escaped_dot);
70-
var render = new dagreD3.render();
7162
var svg = d3.select("#viz-graph").append("svg");
72-
svg.call(render, g);
63+
64+
// Each div in #viz-dot-files stores the content of one dot file
65+
// Each dot file is used to generate the visualization for a stage
66+
d3.selectAll("#viz-dot-files div").each(function(d, i) {
67+
var div = d3.select(this);
68+
var stageId = div.attr("name");
69+
var dot = div.text();
70+
var container = svg.append("g").attr("id", "graph_" + stageId);
71+
// Move the container so it doesn't overlap with the existing ones
72+
container.attr("transform", "translate(" + VizDefaults.stageVizOffset * i + ", 0)");
73+
renderDot(dot, container);
74+
});
7375

7476
// Set the appropriate SVG dimensions to ensure that all elements are displayed
7577
var svgMargin = 20;
7678
var boundingBox = svg.node().getBBox();
7779
svg.style("width", (boundingBox.width + svgMargin) + "px");
7880
svg.style("height", (boundingBox.height + svgMargin) + "px");
7981

80-
// Add style to clusters, nodes and edges
81-
d3.selectAll("svg g.cluster rect")
82-
.style("fill", "none")
83-
.style("stroke", "#AADFFF")
84-
.style("stroke-width", "4px")
85-
.style("stroke-opacity", "0.5");
86-
d3.selectAll("svg g.node rect")
87-
.style("fill", "white")
88-
.style("stroke", "black")
89-
.style("stroke-width", "2px")
90-
.style("fill-opacity", "0.8")
91-
.style("stroke-opacity", "0.9");
92-
d3.selectAll("svg g.edgePath path")
93-
.style("stroke", "black")
94-
.style("stroke-width", "2px");
95-
9682
// Add labels to clusters
9783
d3.selectAll("svg g.cluster").each(function(cluster_data) {
9884
var cluster = d3.select(this);
@@ -103,12 +89,10 @@ function renderStageViz() {
10389
rect.attr("height", toFloat(rect.attr("height")) + 10);
10490
var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - 5;
10591
var labelY = toFloat(rect.attr("y")) + 15;
106-
var labelText = cluster.attr("id").replace(/^cluster/, "").replace(/_.*$/, "");
92+
var labelText = cluster.attr("name").replace(/^cluster_/, "");
10793
cluster.append("text")
10894
.attr("x", labelX)
10995
.attr("y", labelY)
110-
.attr("fill", "#AAAAAA")
111-
.attr("font-size", "11px")
11296
.attr("text-anchor", "end")
11397
.text(labelText);
11498
});
@@ -123,6 +107,71 @@ function renderStageViz() {
123107
svg.attr("viewBox", newViewBox);
124108
}
125109

110+
/*
111+
*
112+
*/
113+
function renderDot(dot, container) {
114+
var escaped_dot = dot
115+
.replace(/&lt;/g, "<")
116+
.replace(/&gt;/g, ">")
117+
.replace(/&quot;/g, "\"");
118+
var g = graphlibDot.read(escaped_dot);
119+
var renderer = new dagreD3.render();
120+
renderer(container, g);
121+
}
122+
123+
/*
124+
* Style the visualization we just rendered.
125+
* We apply a different style depending on whether this is a stage or a job.
126+
*/
127+
function styleViz(forJob) {
128+
if (forJob) {
129+
styleJobViz();
130+
} else {
131+
styleStageViz();
132+
}
133+
}
134+
135+
function styleJobViz() {
136+
d3.selectAll("svg g.cluster rect")
137+
.style("fill", "none")
138+
.style("stroke", "#AADFFF")
139+
.style("stroke-width", "4px")
140+
.style("stroke-opacity", "0.5");
141+
d3.selectAll("svg g.node rect")
142+
.style("fill", "white")
143+
.style("stroke", "black")
144+
.style("stroke-width", "2px")
145+
.style("fill-opacity", "0.8")
146+
.style("stroke-opacity", "0.9");
147+
d3.selectAll("svg g.edgePath path")
148+
.style("stroke", "black")
149+
.style("stroke-width", "2px");
150+
d3.selectAll("svg g.cluster text")
151+
.attr("fill", "#AAAAAA")
152+
.attr("font-size", "11px")
153+
}
154+
155+
function styleStageViz() {
156+
d3.selectAll("svg g.cluster rect")
157+
.style("fill", "none")
158+
.style("stroke", "#AADFFF")
159+
.style("stroke-width", "4px")
160+
.style("stroke-opacity", "0.5");
161+
d3.selectAll("svg g.node rect")
162+
.style("fill", "white")
163+
.style("stroke", "black")
164+
.style("stroke-width", "2px")
165+
.style("fill-opacity", "0.8")
166+
.style("stroke-opacity", "0.9");
167+
d3.selectAll("svg g.edgePath path")
168+
.style("stroke", "black")
169+
.style("stroke-width", "2px");
170+
d3.selectAll("svg g.cluster text")
171+
.attr("fill", "#AAAAAA")
172+
.attr("font-size", "11px")
173+
}
174+
126175
/* Helper method to convert attributes to numeric values. */
127176
function toFloat(f) {
128177
return parseFloat(f.replace(/px$/, ""))

core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class OrderedRDDFunctions[K : Ordering : ClassTag,
4747
self: RDD[P])
4848
extends Logging with Serializable
4949
{
50+
// TODO: Don't forget to scope me later
51+
5052
private val ordering = implicitly[Ordering[K]]
5153

5254
/**

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ private[spark] object UIUtils extends Logging {
171171
<script src={prependBaseUri("/static/d3.min.js")}></script>
172172
<script src={prependBaseUri("/static/dagre-d3.min.js")}></script>
173173
<script src={prependBaseUri("/static/graphlib-dot.min.js")}></script>
174-
<script src={prependBaseUri("/static/spark-stage-viz.js")}></script>
174+
<script src={prependBaseUri("/static/spark-dag-viz.js")}></script>
175175
}
176176

177177
/** Returns a spark page with correctly formatted headers */

core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.ui.{UIUtils, WebUIPage}
2929
/** Page showing statistics and stage list for a given job */
3030
private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
3131
private val listener = parent.listener
32+
private val vizListener = parent.vizListener
3233

3334
def render(request: HttpServletRequest): Seq[Node] = {
3435
listener.synchronized {
@@ -154,6 +155,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
154155
</div>
155156

156157
var content = summary
158+
content ++= vizListener.showVizElementForJob(jobId)
157159
if (shouldShowActiveStages) {
158160
content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
159161
activeStagesTable.toNodeSeq
@@ -174,7 +176,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
174176
content ++= <h4 id ="failed">Failed Stages ({failedStages.size})</h4> ++
175177
failedStagesTable.toNodeSeq
176178
}
177-
UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent)
179+
UIUtils.headerSparkPage(s"Details for Job $jobId", content, parent, showVisualization = true)
178180
}
179181
}
180182
}

core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
2525
val sc = parent.sc
2626
val killEnabled = parent.killEnabled
2727
def isFairScheduler: Boolean = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
28+
// TODO: rename this listener
2829
val listener = parent.jobProgressListener
30+
val vizListener = parent.visualizationListener
2931

3032
attachPage(new AllJobsPage(this))
3133
attachPage(new JobPage(this))

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -36,32 +36,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
3636
private val progressListener = parent.progressListener
3737
private val vizListener = parent.vizListener
3838

39-
/**
40-
* Return a DOM element containing the "Show Visualization" toggle that, if enabled,
41-
* renders the visualization for the stage. If there is no visualization information
42-
* available for this stage, an appropriate message is displayed to the user.
43-
*/
44-
private def showVizElement(stageId: Int): Seq[Node] = {
45-
val graph = vizListener.getVizGraph(stageId)
46-
<div>
47-
<span class="expand-visualization" onclick="render();">
48-
<span class="expand-visualization-arrow arrow-closed"></span>
49-
<strong>Show Visualization</strong>
50-
</span>
51-
<div id="viz-graph"></div>
52-
<script type="text/javascript">
53-
{Unparsed(s"function render() { toggleStageViz(); }")}
54-
</script>
55-
{
56-
if (graph.isDefined) {
57-
<div id="viz-dot-file" style="display:none">
58-
{VizGraph.makeDotFile(graph.get)}
59-
</div>
60-
}
61-
}
62-
</div>
63-
}
64-
6539
def render(request: HttpServletRequest): Seq[Node] = {
6640
progressListener.synchronized {
6741
val parameterId = request.getParameter("id")
@@ -462,7 +436,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
462436
val content =
463437
summary ++
464438
showAdditionalMetrics ++
465-
showVizElement(stageId) ++
439+
vizListener.showVizElementForStage(stageId) ++
466440
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
467441
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
468442
<h4>Aggregated Metrics by Executor</h4> ++ executorTable.toNodeSeq ++

core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,39 +18,81 @@
1818
package org.apache.spark.ui.viz
1919

2020
import scala.collection.mutable
21+
import scala.xml.{Node, Unparsed}
2122

2223
import org.apache.spark.SparkConf
2324
import org.apache.spark.scheduler._
2425
import org.apache.spark.ui.SparkUI
2526

2627
/**
27-
* A SparkListener that constructs a graph of the RDD DAG for each stage.
28-
* This graph will be used for rendering visualization in the UI later.
28+
* A SparkListener that constructs RDD DAG visualization for the UI.
2929
*/
3030
private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener {
31-
32-
// A list of stage IDs to track the order in which stages are inserted
33-
private val stageIds = new mutable.ArrayBuffer[Int]
34-
35-
// Stage ID -> graph metadata for the stage
31+
private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
3632
private val stageIdToGraph = new mutable.HashMap[Int, VizGraph]
33+
private val stageIds = new mutable.ArrayBuffer[Int]
3734

38-
// How many stages to retain graph metadata for
35+
// How many jobs or stages to retain graph metadata for
3936
private val retainedStages =
4037
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
4138

39+
/** Construct a "Show visualization" DOM element that expands into a visualization for a stage. */
40+
def showVizElementForStage(stageId: Int): Seq[Node] = {
41+
showVizElement(getVizGraphForStage(stageId).toSeq, forJob = false)
42+
}
43+
44+
/** Construct a "Show visualization" DOM element that expands into a visualization for a job. */
45+
def showVizElementForJob(jobId: Int): Seq[Node] = {
46+
showVizElement(getVizGraphsForJob(jobId), forJob = true)
47+
}
48+
49+
/** Construct a "Show visualization" DOM element that expands into a visualization on the UI. */
50+
private def showVizElement(graphs: Seq[VizGraph], forJob: Boolean): Seq[Node] = {
51+
<div>
52+
<span class="expand-visualization" onclick="render();">
53+
<span class="expand-visualization-arrow arrow-closed"></span>
54+
<strong>Show visualization</strong>
55+
</span>
56+
<div id="viz-graph"></div>
57+
<div id="viz-dot-files">
58+
{
59+
graphs.map { g =>
60+
<div name={g.rootScope.id} style="display:none">
61+
{VizGraph.makeDotFile(g, forJob)}
62+
</div>
63+
}
64+
}
65+
</div>
66+
</div> ++
67+
<script type="text/javascript">
68+
{Unparsed(s"function render() { toggleShowViz($forJob); }")}
69+
</script>
70+
}
71+
4272
/** Return the graph metadata for the given stage, or None if no such information exists. */
43-
def getVizGraph(stageId: Int): Option[VizGraph] = {
73+
private def getVizGraphsForJob(jobId: Int): Seq[VizGraph] = {
74+
jobIdToStageIds.get(jobId)
75+
.map { sids => sids.flatMap { sid => stageIdToGraph.get(sid) } }
76+
.getOrElse { Seq.empty }
77+
}
78+
79+
/** Return the graph metadata for the given stage, or None if no such information exists. */
80+
private def getVizGraphForStage(stageId: Int): Option[VizGraph] = {
4481
stageIdToGraph.get(stageId)
4582
}
4683

47-
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
48-
val stageId = stageCompleted.stageInfo.stageId
49-
val rddInfos = stageCompleted.stageInfo.rddInfos
50-
val vizGraph = VizGraph.makeVizGraph(rddInfos)
51-
stageIdToGraph(stageId) = vizGraph
84+
/** On job start, construct a VizGraph for each stage in the job for display later. */
85+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
86+
val jobId = jobStart.jobId
87+
val stageInfos = jobStart.stageInfos
88+
89+
stageInfos.foreach { stageInfo =>
90+
stageIds += stageInfo.stageId
91+
stageIdToGraph(stageInfo.stageId) = VizGraph.makeVizGraph(stageInfo)
92+
}
93+
jobIdToStageIds(jobId) = stageInfos.map(_.stageId)
5294

53-
// Remove metadata for old stages
95+
// Remove graph metadata for old stages
5496
if (stageIds.size >= retainedStages) {
5597
val toRemove = math.max(retainedStages / 10, 1)
5698
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }

0 commit comments

Comments
 (0)