Skip to content

Commit 563bfcc

Browse files
author
Andrew Or
committed
[SPARK-7627] [SPARK-7472] DAG visualization: style skipped stages
This patch fixes two things: **SPARK-7627.** Cached RDDs no longer light up on the job page. This is a simple fix. **SPARK-7472.** Display skipped stages differently from normal stages. The latter is a major UX issue. Because we link the job viz to the stage viz even for skipped stages, the user may inadvertently click into the stage page of a skipped stage, which is empty. ------------------- <img src="https://cloud.githubusercontent.com/assets/2133137/7675241/de1a3da6-fcea-11e4-8101-88055cef78c5.png" width="300px" /> Author: Andrew Or <[email protected]> Closes apache#6171 from andrewor14/dag-viz-skipped and squashes the following commits: f261797 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped 0eda358 [Andrew Or] Tweak skipped stage border color c604150 [Andrew Or] Tweak grayscale colors 7010676 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped 762b541 [Andrew Or] Use special prefix for stage clusters to avoid collisions 51c95b9 [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped b928cd4 [Andrew Or] Fix potential leak + write tests for it 7c4c364 [Andrew Or] Show skipped stages differently 7cc34ce [Andrew Or] Merge branch 'master' of github.com:apache/spark into dag-viz-skipped c121fa2 [Andrew Or] Fix cache color
1 parent 814b3da commit 563bfcc

File tree

6 files changed

+352
-108
lines changed

6 files changed

+352
-108
lines changed

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css

Lines changed: 44 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,21 @@
1515
* limitations under the License.
1616
*/
1717

18-
#dag-viz-graph svg path {
19-
stroke: #444;
20-
stroke-width: 1.5px;
21-
}
22-
23-
#dag-viz-graph svg g.cluster rect {
24-
stroke-width: 1px;
25-
}
26-
27-
#dag-viz-graph svg g.node circle {
28-
fill: #444;
18+
#dag-viz-graph a, #dag-viz-graph a:hover {
19+
text-decoration: none;
2920
}
3021

31-
#dag-viz-graph svg g.node rect {
32-
fill: #C3EBFF;
33-
stroke: #3EC0FF;
34-
stroke-width: 1px;
22+
#dag-viz-graph .label {
23+
font-weight: normal;
24+
text-shadow: none;
3525
}
3626

37-
#dag-viz-graph svg g.node.cached circle {
38-
fill: #444;
27+
#dag-viz-graph svg path {
28+
stroke: #444;
29+
stroke-width: 1.5px;
3930
}
4031

41-
#dag-viz-graph svg g.node.cached rect {
42-
fill: #B3F5C5;
43-
stroke: #56F578;
32+
#dag-viz-graph svg g.cluster rect {
4433
stroke-width: 1px;
4534
}
4635

@@ -61,12 +50,23 @@
6150
stroke-width: 1px;
6251
}
6352

64-
#dag-viz-graph svg.job g.cluster[class*="stage"] rect {
53+
#dag-viz-graph svg.job g.cluster.skipped rect {
54+
fill: #D6D6D6;
55+
stroke: #B7B7B7;
56+
stroke-width: 1px;
57+
}
58+
59+
#dag-viz-graph svg.job g.cluster.stage rect {
6560
fill: #FFFFFF;
6661
stroke: #FF99AC;
6762
stroke-width: 1px;
6863
}
6964

65+
#dag-viz-graph svg.job g.cluster.stage.skipped rect {
66+
stroke: #ADADAD;
67+
stroke-width: 1px;
68+
}
69+
7070
#dag-viz-graph svg.job g#cross-stage-edges path {
7171
fill: none;
7272
}
@@ -75,6 +75,20 @@
7575
fill: #333;
7676
}
7777

78+
#dag-viz-graph svg.job g.cluster.skipped text {
79+
fill: #666;
80+
}
81+
82+
#dag-viz-graph svg.job g.node circle {
83+
fill: #444;
84+
}
85+
86+
#dag-viz-graph svg.job g.node.cached circle {
87+
fill: #A3F545;
88+
stroke: #52C366;
89+
stroke-width: 2px;
90+
}
91+
7892
/* Stage page specific styles */
7993

8094
#dag-viz-graph svg.stage g.cluster rect {
@@ -83,7 +97,7 @@
8397
stroke-width: 1px;
8498
}
8599

86-
#dag-viz-graph svg.stage g.cluster[class*="stage"] rect {
100+
#dag-viz-graph svg.stage g.cluster.stage rect {
87101
fill: #FFFFFF;
88102
stroke: #FFA6B6;
89103
stroke-width: 1px;
@@ -97,11 +111,14 @@
97111
fill: #333;
98112
}
99113

100-
#dag-viz-graph a, #dag-viz-graph a:hover {
101-
text-decoration: none;
114+
#dag-viz-graph svg.stage g.node rect {
115+
fill: #C3EBFF;
116+
stroke: #3EC0FF;
117+
stroke-width: 1px;
102118
}
103119

104-
#dag-viz-graph .label {
105-
font-weight: normal;
106-
text-shadow: none;
120+
#dag-viz-graph svg.stage g.node.cached rect {
121+
fill: #B3F5C5;
122+
stroke: #52C366;
123+
stroke-width: 2px;
107124
}

core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js

Lines changed: 32 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ var VizConstants = {
5757
stageSep: 40,
5858
graphPrefix: "graph_",
5959
nodePrefix: "node_",
60-
stagePrefix: "stage_",
61-
clusterPrefix: "cluster_",
62-
stageClusterPrefix: "cluster_stage_"
60+
clusterPrefix: "cluster_"
6361
};
6462

6563
var JobPageVizConstants = {
@@ -133,9 +131,7 @@ function renderDagViz(forJob) {
133131
}
134132

135133
// Render
136-
var svg = graphContainer()
137-
.append("svg")
138-
.attr("class", jobOrStage);
134+
var svg = graphContainer().append("svg").attr("class", jobOrStage);
139135
if (forJob) {
140136
renderDagVizForJob(svg);
141137
} else {
@@ -185,23 +181,32 @@ function renderDagVizForJob(svgContainer) {
185181
var dot = metadata.select(".dot-file").text();
186182
var stageId = metadata.attr("stage-id");
187183
var containerId = VizConstants.graphPrefix + stageId;
188-
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
189-
var stageLink = $("#stage-" + stageId.replace(VizConstants.stagePrefix, "") + "-0")
190-
.find("a")
191-
.attr("href") + "&expandDagViz=true";
192-
var container = svgContainer
193-
.append("a")
194-
.attr("xlink:href", stageLink)
195-
.append("g")
196-
.attr("id", containerId);
184+
var isSkipped = metadata.attr("skipped") == "true";
185+
var container;
186+
if (isSkipped) {
187+
container = svgContainer
188+
.append("g")
189+
.attr("id", containerId)
190+
.attr("skipped", "true");
191+
} else {
192+
// Link each graph to the corresponding stage page (TODO: handle stage attempts)
193+
// Use the link from the stage table so it also works for the history server
194+
var attemptId = 0
195+
var stageLink = d3.select("#stage-" + stageId + "-" + attemptId)
196+
.select("a")
197+
.attr("href") + "&expandDagViz=true";
198+
container = svgContainer
199+
.append("a")
200+
.attr("xlink:href", stageLink)
201+
.append("g")
202+
.attr("id", containerId);
203+
}
197204

198205
// Now we need to shift the container for this stage so it doesn't overlap with
199206
// existing ones, taking into account the position and width of the last stage's
200207
// container. We do not need to do this for the first stage of this job.
201208
if (i > 0) {
202-
var existingStages = svgContainer
203-
.selectAll("g.cluster")
204-
.filter("[class*=\"" + VizConstants.stageClusterPrefix + "\"]");
209+
var existingStages = svgContainer.selectAll("g.cluster.stage")
205210
if (!existingStages.empty()) {
206211
var lastStage = d3.select(existingStages[0].pop());
207212
var lastStageWidth = toFloat(lastStage.select("rect").attr("width"));
@@ -214,6 +219,12 @@ function renderDagVizForJob(svgContainer) {
214219
// Actually render the stage
215220
renderDot(dot, container, true);
216221

222+
// Mark elements as skipped if appropriate. Unfortunately we need to mark all
223+
// elements instead of the parent container because of CSS override rules.
224+
if (isSkipped) {
225+
container.selectAll("g").classed("skipped", true);
226+
}
227+
217228
// Round corners on rectangles
218229
container
219230
.selectAll("rect")
@@ -243,6 +254,9 @@ function renderDot(dot, container, forJob) {
243254
var renderer = new dagreD3.render();
244255
preprocessGraphLayout(g, forJob);
245256
renderer(container, g);
257+
258+
// Find the stage cluster and mark it for styling and post-processing
259+
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
246260
}
247261

248262
/* -------------------- *

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,10 +352,12 @@ private[spark] object UIUtils extends Logging {
352352
</a>
353353
</span>
354354
<div id="dag-viz-graph"></div>
355-
<div id="dag-viz-metadata">
355+
<div id="dag-viz-metadata" style="display:none">
356356
{
357357
graphs.map { g =>
358-
<div class="stage-metadata" stage-id={g.rootCluster.id} style="display:none">
358+
val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "")
359+
val skipped = g.rootCluster.name.contains("skipped").toString
360+
<div class="stage-metadata" stage-id={stageId} skipped={skipped}>
359361
<div class="dot-file">{RDDOperationGraph.makeDotFile(g)}</div>
360362
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
361363
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,10 +52,13 @@ private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
5252
* This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap),
5353
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
5454
*/
55-
private[ui] class RDDOperationCluster(val id: String, val name: String) {
55+
private[ui] class RDDOperationCluster(val id: String, private var _name: String) {
5656
private val _childNodes = new ListBuffer[RDDOperationNode]
5757
private val _childClusters = new ListBuffer[RDDOperationCluster]
5858

59+
def name: String = _name
60+
def setName(n: String): Unit = { _name = n }
61+
5962
def childNodes: Seq[RDDOperationNode] = _childNodes.iterator.toSeq
6063
def childClusters: Seq[RDDOperationCluster] = _childClusters.iterator.toSeq
6164
def attachChildNode(childNode: RDDOperationNode): Unit = { _childNodes += childNode }
@@ -71,6 +74,8 @@ private[ui] class RDDOperationCluster(val id: String, val name: String) {
7174

7275
private[ui] object RDDOperationGraph extends Logging {
7376

77+
val STAGE_CLUSTER_PREFIX = "stage_"
78+
7479
/**
7580
* Construct a RDDOperationGraph for a given stage.
7681
*
@@ -88,7 +93,8 @@ private[ui] object RDDOperationGraph extends Logging {
8893
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
8994

9095
// Root cluster is the stage cluster
91-
val stageClusterId = s"stage_${stage.stageId}"
96+
// Use a special prefix here to differentiate this cluster from other operation clusters
97+
val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId
9298
val stageClusterName = s"Stage ${stage.stageId}" +
9399
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
94100
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)

core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraphListener.scala

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,15 @@ import org.apache.spark.ui.SparkUI
2727
* A SparkListener that constructs a DAG of RDD operations.
2828
*/
2929
private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListener {
30+
31+
// Note: the fate of jobs and stages are tied. This means when we clean up a job,
32+
// we always clean up all of its stages. Similarly, when we clean up a stage, we
33+
// always clean up its job (and, transitively, other stages in the same job).
3034
private[ui] val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
35+
private[ui] val jobIdToSkippedStageIds = new mutable.HashMap[Int, Seq[Int]]
36+
private[ui] val stageIdToJobId = new mutable.HashMap[Int, Int]
3137
private[ui] val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
38+
private[ui] val completedStageIds = new mutable.HashSet[Int]
3239

3340
// Keep track of the order in which these are inserted so we can remove old ones
3441
private[ui] val jobIds = new mutable.ArrayBuffer[Int]
@@ -40,16 +47,23 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
4047
private val retainedStages =
4148
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
4249

43-
/** Return the graph metadata for the given stage, or None if no such information exists. */
50+
/**
51+
* Return the graph metadata for all stages in the given job.
52+
* An empty list is returned if one or more of its stages has been cleaned up.
53+
*/
4454
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
45-
val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
46-
val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
47-
// If the metadata for some stages have been removed, do not bother rendering this job
48-
if (_stageIds.size != graphs.size) {
49-
Seq.empty
50-
} else {
51-
graphs
55+
val skippedStageIds = jobIdToSkippedStageIds.get(jobId).getOrElse(Seq.empty)
56+
val graphs = jobIdToStageIds.get(jobId)
57+
.getOrElse(Seq.empty)
58+
.flatMap { sid => stageIdToGraph.get(sid) }
59+
// Mark any skipped stages as such
60+
graphs.foreach { g =>
61+
val stageId = g.rootCluster.id.replaceAll(RDDOperationGraph.STAGE_CLUSTER_PREFIX, "").toInt
62+
if (skippedStageIds.contains(stageId) && !g.rootCluster.name.contains("skipped")) {
63+
g.rootCluster.setName(g.rootCluster.name + " (skipped)")
64+
}
5265
}
66+
graphs
5367
}
5468

5569
/** Return the graph metadata for the given stage, or None if no such information exists. */
@@ -66,22 +80,68 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
6680
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted
6781

6882
stageInfos.foreach { stageInfo =>
69-
stageIds += stageInfo.stageId
70-
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
71-
// Remove state for old stages
72-
if (stageIds.size >= retainedStages) {
73-
val toRemove = math.max(retainedStages / 10, 1)
74-
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
75-
stageIds.trimStart(toRemove)
76-
}
83+
val stageId = stageInfo.stageId
84+
stageIds += stageId
85+
stageIdToJobId(stageId) = jobId
86+
stageIdToGraph(stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
87+
trimStagesIfNecessary()
88+
}
89+
90+
trimJobsIfNecessary()
91+
}
92+
93+
/** Keep track of stages that have completed. */
94+
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
95+
val stageId = stageCompleted.stageInfo.stageId
96+
if (stageIdToJobId.contains(stageId)) {
97+
// Note: Only do this if the stage has not already been cleaned up
98+
// Otherwise, we may never clean this stage from `completedStageIds`
99+
completedStageIds += stageCompleted.stageInfo.stageId
100+
}
101+
}
102+
103+
/** On job end, find all stages in this job that are skipped and mark them as such. */
104+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
105+
val jobId = jobEnd.jobId
106+
jobIdToStageIds.get(jobId).foreach { stageIds =>
107+
val skippedStageIds = stageIds.filter { sid => !completedStageIds.contains(sid) }
108+
// Note: Only do this if the job has not already been cleaned up
109+
// Otherwise, we may never clean this job from `jobIdToSkippedStageIds`
110+
jobIdToSkippedStageIds(jobId) = skippedStageIds
77111
}
112+
}
113+
114+
/** Clean metadata for old stages if we have exceeded the number to retain. */
115+
private def trimStagesIfNecessary(): Unit = {
116+
if (stageIds.size >= retainedStages) {
117+
val toRemove = math.max(retainedStages / 10, 1)
118+
stageIds.take(toRemove).foreach { id => cleanStage(id) }
119+
stageIds.trimStart(toRemove)
120+
}
121+
}
78122

79-
// Remove state for old jobs
123+
/** Clean metadata for old jobs if we have exceeded the number to retain. */
124+
private def trimJobsIfNecessary(): Unit = {
80125
if (jobIds.size >= retainedJobs) {
81126
val toRemove = math.max(retainedJobs / 10, 1)
82-
jobIds.take(toRemove).foreach { id => jobIdToStageIds.remove(id) }
127+
jobIds.take(toRemove).foreach { id => cleanJob(id) }
83128
jobIds.trimStart(toRemove)
84129
}
85130
}
86131

132+
/** Clean metadata for the given stage, its job, and all other stages that belong to the job. */
133+
private[ui] def cleanStage(stageId: Int): Unit = {
134+
completedStageIds.remove(stageId)
135+
stageIdToGraph.remove(stageId)
136+
stageIdToJobId.remove(stageId).foreach { jobId => cleanJob(jobId) }
137+
}
138+
139+
/** Clean metadata for the given job and all stages that belong to it. */
140+
private[ui] def cleanJob(jobId: Int): Unit = {
141+
jobIdToSkippedStageIds.remove(jobId)
142+
jobIdToStageIds.remove(jobId).foreach { stageIds =>
143+
stageIds.foreach { stageId => cleanStage(stageId) }
144+
}
145+
}
146+
87147
}

0 commit comments

Comments
 (0)