Skip to content

Commit 31aae06

Browse files
author
Andrew Or
committed
Extract visualization logic from listener
1 parent 83f9c58 commit 31aae06

File tree

8 files changed

+78
-70
lines changed

8 files changed

+78
-70
lines changed

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
2525
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
2626
import org.apache.spark.ui.jobs.{JobsTab, JobProgressListener, StagesTab}
2727
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
28-
import org.apache.spark.ui.viz.VisualizationListener
28+
import org.apache.spark.ui.scope.OperationGraphListener
2929

3030
/**
3131
* Top level user interface for a Spark application.
@@ -39,7 +39,7 @@ private[spark] class SparkUI private (
3939
val executorsListener: ExecutorsListener,
4040
val jobProgressListener: JobProgressListener,
4141
val storageListener: StorageListener,
42-
val visualizationListener: VisualizationListener,
42+
val operationGraphListener: OperationGraphListener,
4343
var appName: String,
4444
val basePath: String)
4545
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
@@ -149,16 +149,16 @@ private[spark] object SparkUI {
149149
val storageStatusListener = new StorageStatusListener
150150
val executorsListener = new ExecutorsListener(storageStatusListener)
151151
val storageListener = new StorageListener(storageStatusListener)
152-
val visualizationListener = new VisualizationListener(conf)
152+
val operationGraphListener = new OperationGraphListener(conf)
153153

154154
listenerBus.addListener(environmentListener)
155155
listenerBus.addListener(storageStatusListener)
156156
listenerBus.addListener(executorsListener)
157157
listenerBus.addListener(storageListener)
158-
listenerBus.addListener(visualizationListener)
158+
listenerBus.addListener(operationGraphListener)
159159

160160
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
161-
executorsListener, _jobProgressListener, storageListener, visualizationListener,
161+
executorsListener, _jobProgressListener, storageListener, operationGraphListener,
162162
appName, basePath)
163163
}
164164
}

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.{Locale, Date}
2323
import scala.xml.{Node, Text}
2424

2525
import org.apache.spark.Logging
26-
import org.apache.spark.ui.viz.VizGraph
26+
import org.apache.spark.ui.scope.RDDOperationGraph
2727

2828
/** Utility functions for generating XML pages with spark content. */
2929
private[spark] object UIUtils extends Logging {
@@ -332,12 +332,12 @@ private[spark] object UIUtils extends Logging {
332332
}
333333

334334
/** Return a "DAG visualization" DOM element that expands into a visualization for a stage. */
335-
def showDagVizForStage(stageId: Int, graph: Option[VizGraph]): Seq[Node] = {
335+
def showDagVizForStage(stageId: Int, graph: Option[RDDOperationGraph]): Seq[Node] = {
336336
showDagViz(graph.toSeq, forJob = false)
337337
}
338338

339339
/** Return a "DAG visualization" DOM element that expands into a visualization for a job. */
340-
def showDagVizForJob(jobId: Int, graphs: Seq[VizGraph]): Seq[Node] = {
340+
def showDagVizForJob(jobId: Int, graphs: Seq[RDDOperationGraph]): Seq[Node] = {
341341
showDagViz(graphs, forJob = true)
342342
}
343343

@@ -348,7 +348,7 @@ private[spark] object UIUtils extends Logging {
348348
* a format that is expected by spark-dag-viz.js. Any changes in the format here must be
349349
* reflected there.
350350
*/
351-
private def showDagViz(graphs: Seq[VizGraph], forJob: Boolean): Seq[Node] = {
351+
private def showDagViz(graphs: Seq[RDDOperationGraph], forJob: Boolean): Seq[Node] = {
352352
<div>
353353
<span class="expand-dag-viz" onclick={s"toggleDagViz($forJob);"}>
354354
<span class="expand-dag-viz-arrow arrow-closed"></span>
@@ -359,7 +359,7 @@ private[spark] object UIUtils extends Logging {
359359
{
360360
graphs.map { g =>
361361
<div class="stage-metadata" stageId={g.rootCluster.id} style="display:none">
362-
<div class="dot-file">{VizGraph.makeDotFile(g, forJob)}</div>
362+
<div class="dot-file">{RDDOperationGraph.makeDotFile(g, forJob)}</div>
363363
{ g.incomingEdges.map { e => <div class="incoming-edge">{e.fromId},{e.toId}</div> } }
364364
{ g.outgoingEdges.map { e => <div class="outgoing-edge">{e.fromId},{e.toId}</div> } }
365365
</div>

core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -304,12 +304,13 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
304304
var content = summary
305305
val appStartTime = listener.startTime
306306
val executorListener = parent.executorListener
307-
val vizListener = parent.vizListener
307+
val operationGraphListener = parent.operationGraphListener
308308

309309
content ++= makeTimeline(activeStages ++ completedStages ++ failedStages,
310310
executorListener.executorIdToData, appStartTime)
311311

312-
content ++= UIUtils.showDagVizForJob(jobId, vizListener.getVizGraphsForJob(jobId))
312+
content ++= UIUtils.showDagVizForJob(
313+
jobId, operationGraphListener.getOperationGraphForJob(jobId))
313314

314315
if (shouldShowActiveStages) {
315316
content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++

core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ private[ui] class JobsTab(parent: SparkUI) extends SparkUITab(parent, "jobs") {
2626
val killEnabled = parent.killEnabled
2727
val jobProgresslistener = parent.jobProgressListener
2828
val executorListener = parent.executorsListener
29-
val vizListener = parent.visualizationListener
29+
val operationGraphListener = parent.operationGraphListener
3030

3131
def isFairScheduler: Boolean =
3232
jobProgresslistener.schedulingMode.exists(_ == SchedulingMode.FAIR)

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,14 @@ import org.apache.commons.lang3.StringEscapeUtils
2727
import org.apache.spark.executor.TaskMetrics
2828
import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
2929
import org.apache.spark.ui.jobs.UIData._
30-
import org.apache.spark.ui.viz.VizGraph
30+
import org.apache.spark.ui.scope.RDDOperationGraph
3131
import org.apache.spark.util.{Utils, Distribution}
3232
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
3333

3434
/** Page showing statistics and task list for a given stage */
3535
private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
3636
private val progressListener = parent.progressListener
37-
private val vizListener = parent.vizListener
37+
private val operationGraphListener = parent.operationGraphListener
3838

3939
def render(request: HttpServletRequest): Seq[Node] = {
4040
progressListener.synchronized {
@@ -171,7 +171,8 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
171171
</div>
172172
</div>
173173

174-
val dagViz = UIUtils.showDagVizForStage(stageId, vizListener.getVizGraphForStage(stageId))
174+
val dagViz = UIUtils.showDagVizForStage(
175+
stageId, operationGraphListener.getOperationGraphForStage(stageId))
175176

176177
val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
177178
def accumulableRow(acc: AccumulableInfo): Elem =

core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages"
2727
val sc = parent.sc
2828
val killEnabled = parent.killEnabled
2929
val progressListener = parent.jobProgressListener
30-
val vizListener = parent.visualizationListener
30+
val operationGraphListener = parent.operationGraphListener
3131

3232
attachPage(new AllStagesPage(this))
3333
attachPage(new StagePage(this))

core/src/main/scala/org/apache/spark/ui/viz/VisualizationListener.scala renamed to core/src/main/scala/org/apache/spark/ui/scope/OperationGraphListener.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,47 +15,46 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.ui.viz
18+
package org.apache.spark.ui.scope
1919

2020
import scala.collection.mutable
21-
import scala.xml.{Node, Unparsed}
2221

2322
import org.apache.spark.SparkConf
2423
import org.apache.spark.scheduler._
2524
import org.apache.spark.ui.SparkUI
2625

2726
/**
28-
* A SparkListener that constructs the RDD DAG visualization for the UI.
27+
* A SparkListener that constructs a DAG of RDD operations.
2928
*/
30-
private[ui] class VisualizationListener(conf: SparkConf) extends SparkListener {
29+
private[ui] class OperationGraphListener(conf: SparkConf) extends SparkListener {
3130
private val jobIdToStageIds = new mutable.HashMap[Int, Seq[Int]]
32-
private val stageIdToGraph = new mutable.HashMap[Int, VizGraph]
31+
private val stageIdToGraph = new mutable.HashMap[Int, RDDOperationGraph]
3332
private val stageIds = new mutable.ArrayBuffer[Int]
3433

3534
// How many jobs or stages to retain graph metadata for
3635
private val retainedStages =
3736
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
3837

3938
/** Return the graph metadata for the given stage, or None if no such information exists. */
40-
def getVizGraphsForJob(jobId: Int): Seq[VizGraph] = {
39+
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
4140
jobIdToStageIds.get(jobId)
4241
.map { sids => sids.flatMap { sid => stageIdToGraph.get(sid) } }
4342
.getOrElse { Seq.empty }
4443
}
4544

4645
/** Return the graph metadata for the given stage, or None if no such information exists. */
47-
def getVizGraphForStage(stageId: Int): Option[VizGraph] = {
46+
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
4847
stageIdToGraph.get(stageId)
4948
}
5049

51-
/** On job start, construct a VizGraph for each stage in the job for display later. */
50+
/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
5251
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
5352
val jobId = jobStart.jobId
5453
val stageInfos = jobStart.stageInfos
5554

5655
stageInfos.foreach { stageInfo =>
5756
stageIds += stageInfo.stageId
58-
stageIdToGraph(stageInfo.stageId) = VizGraph.makeVizGraph(stageInfo)
57+
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
5958
}
6059
jobIdToStageIds(jobId) = stageInfos.map(_.stageId).sorted
6160

core/src/main/scala/org/apache/spark/ui/viz/VizGraph.scala renamed to core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala

Lines changed: 51 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,78 +15,82 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.ui.viz
18+
package org.apache.spark.ui.scope
1919

2020
import scala.collection.mutable
2121
import scala.collection.mutable.ListBuffer
2222

2323
import org.apache.spark.Logging
24-
import org.apache.spark.rdd.OperatorScope
2524
import org.apache.spark.scheduler.StageInfo
2625

2726
/**
28-
* A representation of a generic cluster graph used for storing visualization information.
27+
* A representation of a generic cluster graph used for storing information on RDD operations.
2928
*
3029
* Each graph is defined with a set of edges and a root cluster, which may contain children
3130
* nodes and children clusters. Additionally, a graph may also have edges that enter or exit
3231
* the graph from nodes that belong to adjacent graphs.
3332
*/
34-
private[ui] case class VizGraph(
35-
edges: Seq[VizEdge],
36-
outgoingEdges: Seq[VizEdge],
37-
incomingEdges: Seq[VizEdge],
38-
rootCluster: VizCluster)
33+
private[ui] case class RDDOperationGraph(
34+
edges: Seq[RDDOperationEdge],
35+
outgoingEdges: Seq[RDDOperationEdge],
36+
incomingEdges: Seq[RDDOperationEdge],
37+
rootCluster: RDDOperationCluster)
3938

40-
/** A node in a VizGraph. This represents an RDD. */
41-
private[ui] case class VizNode(id: Int, name: String)
39+
/** A node in an RDDOperationGraph. This represents an RDD. */
40+
private[ui] case class RDDOperationNode(id: Int, name: String)
4241

43-
/** A directed edge connecting two nodes in a VizGraph. This represents an RDD dependency. */
44-
private[ui] case class VizEdge(fromId: Int, toId: Int)
42+
/**
43+
* A directed edge connecting two nodes in an RDDOperationGraph.
44+
* This represents an RDD dependency.
45+
*/
46+
private[ui] case class RDDOperationEdge(fromId: Int, toId: Int)
4547

4648
/**
47-
* A cluster that groups nodes together in a VizGraph.
49+
* A cluster that groups nodes together in an RDDOperationGraph.
4850
*
4951
* This represents any grouping of RDDs, including operator scopes (e.g. textFile, flatMap),
5052
* stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters.
5153
*/
52-
private[ui] class VizCluster(val id: String, val name: String) {
53-
private val _childrenNodes = new ListBuffer[VizNode]
54-
private val _childrenClusters = new ListBuffer[VizCluster]
55-
56-
def childrenNodes: Seq[VizNode] = _childrenNodes.iterator.toSeq
57-
def childrenClusters: Seq[VizCluster] = _childrenClusters.iterator.toSeq
58-
def attachChildNode(childNode: VizNode): Unit = { _childrenNodes += childNode }
59-
def attachChildCluster(childCluster: VizCluster): Unit = { _childrenClusters += childCluster }
54+
private[ui] class RDDOperationCluster(val id: String, val name: String) {
55+
private val _childrenNodes = new ListBuffer[RDDOperationNode]
56+
private val _childrenClusters = new ListBuffer[RDDOperationCluster]
57+
58+
def childrenNodes: Seq[RDDOperationNode] = _childrenNodes.iterator.toSeq
59+
def childrenClusters: Seq[RDDOperationCluster] = _childrenClusters.iterator.toSeq
60+
def attachChildNode(childNode: RDDOperationNode): Unit = { _childrenNodes += childNode }
61+
def attachChildCluster(childCluster: RDDOperationCluster): Unit = {
62+
_childrenClusters += childCluster
63+
}
6064
}
6165

62-
private[ui] object VizGraph extends Logging {
66+
private[ui] object RDDOperationGraph extends Logging {
6367

6468
/**
65-
* Construct a VizGraph for a given stage.
69+
* Construct a RDDOperationGraph for a given stage.
6670
*
6771
* The root cluster represents the stage, and all children clusters represent RDD operations.
6872
* Each node represents an RDD, and each edge represents a dependency between two RDDs pointing
6973
* from the parent to the child.
7074
*
71-
* This does not currently merge common scopes across stages. This may be worth supporting in
72-
* the future when we decide to group certain stages within the same job under a common scope
73-
* (e.g. part of a SQL query).
75+
* This does not currently merge common operator scopes across stages. This may be worth
76+
* supporting in the future if we decide to group certain stages within the same job under
77+
* a common scope (e.g. part of a SQL query).
7478
*/
75-
def makeVizGraph(stage: StageInfo): VizGraph = {
76-
val edges = new ListBuffer[VizEdge]
77-
val nodes = new mutable.HashMap[Int, VizNode]
78-
val clusters = new mutable.HashMap[String, VizCluster] // cluster ID -> VizCluster
79+
def makeOperationGraph(stage: StageInfo): RDDOperationGraph = {
80+
val edges = new ListBuffer[RDDOperationEdge]
81+
val nodes = new mutable.HashMap[Int, RDDOperationNode]
82+
val clusters = new mutable.HashMap[String, RDDOperationCluster] // indexed by cluster ID
7983

8084
// Root cluster is the stage cluster
8185
val stageClusterId = s"stage_${stage.stageId}"
8286
val stageClusterName = s"Stage ${stage.stageId}" +
8387
{ if (stage.attemptId == 0) "" else s" (attempt ${stage.attemptId})" }
84-
val rootCluster = new VizCluster(stageClusterId, stageClusterName)
88+
val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName)
8589

8690
// Find nodes, edges, and operator scopes that belong to this stage
8791
stage.rddInfos.foreach { rdd =>
88-
edges ++= rdd.parentIds.map { parentId => VizEdge(parentId, rdd.id) }
89-
val node = nodes.getOrElseUpdate(rdd.id, VizNode(rdd.id, rdd.name))
92+
edges ++= rdd.parentIds.map { parentId => RDDOperationEdge(parentId, rdd.id) }
93+
val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode(rdd.id, rdd.name))
9094

9195
if (rdd.scope == null) {
9296
// This RDD has no encompassing scope, so we put it directly in the root cluster
@@ -99,7 +103,7 @@ private[ui] object VizGraph extends Logging {
99103
val rddClusters = rddScopes.map { scope =>
100104
val clusterId = scope.name + "_" + scope.id
101105
val clusterName = scope.name
102-
clusters.getOrElseUpdate(clusterId, new VizCluster(clusterId, clusterName))
106+
clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName))
103107
}
104108
// Build the cluster hierarchy for this RDD
105109
rddClusters.sliding(2).foreach { pc =>
@@ -117,10 +121,10 @@ private[ui] object VizGraph extends Logging {
117121

118122
// Classify each edge as internal, outgoing or incoming
119123
// This information is needed to reason about how stages relate to each other
120-
val internalEdges = new ListBuffer[VizEdge]
121-
val outgoingEdges = new ListBuffer[VizEdge]
122-
val incomingEdges = new ListBuffer[VizEdge]
123-
edges.foreach { case e: VizEdge =>
124+
val internalEdges = new ListBuffer[RDDOperationEdge]
125+
val outgoingEdges = new ListBuffer[RDDOperationEdge]
126+
val incomingEdges = new ListBuffer[RDDOperationEdge]
127+
edges.foreach { case e: RDDOperationEdge =>
124128
val fromThisGraph = nodes.contains(e.fromId)
125129
val toThisGraph = nodes.contains(e.toId)
126130
(fromThisGraph, toThisGraph) match {
@@ -132,7 +136,7 @@ private[ui] object VizGraph extends Logging {
132136
}
133137
}
134138

135-
VizGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
139+
RDDOperationGraph(internalEdges, outgoingEdges, incomingEdges, rootCluster)
136140
}
137141

138142
/**
@@ -145,7 +149,7 @@ private[ui] object VizGraph extends Logging {
145149
*
146150
* For the complete DOT specification, see http://www.graphviz.org/Documentation/dotguide.pdf.
147151
*/
148-
def makeDotFile(graph: VizGraph, forJob: Boolean): String = {
152+
def makeDotFile(graph: RDDOperationGraph, forJob: Boolean): String = {
149153
val dotFile = new StringBuilder
150154
dotFile.append("digraph G {\n")
151155
dotFile.append(makeDotSubgraph(graph.rootCluster, forJob, indent = " "))
@@ -159,21 +163,24 @@ private[ui] object VizGraph extends Logging {
159163
}
160164

161165
/**
162-
* Return the dot representation of a node.
166+
* Return the dot representation of a node in an RDDOperationGraph.
163167
*
164168
* On the job page, is displayed as a small circle without labels.
165169
* On the stage page, it is displayed as a box with an embedded label.
166170
*/
167-
private def makeDotNode(node: VizNode, forJob: Boolean): String = {
171+
private def makeDotNode(node: RDDOperationNode, forJob: Boolean): String = {
168172
if (forJob) {
169173
s"""${node.id} [label=" " shape="circle" padding="5" labelStyle="font-size: 0"]"""
170174
} else {
171175
s"""${node.id} [label="${node.name} (${node.id})"]"""
172176
}
173177
}
174178

175-
/** Return the dot representation of a subgraph. */
176-
private def makeDotSubgraph(scope: VizCluster, forJob: Boolean, indent: String): String = {
179+
/** Return the dot representation of a subgraph in an RDDOperationGraph. */
180+
private def makeDotSubgraph(
181+
scope: RDDOperationCluster,
182+
forJob: Boolean,
183+
indent: String): String = {
177184
val subgraph = new StringBuilder
178185
subgraph.append(indent + s"subgraph cluster${scope.id} {\n")
179186
subgraph.append(indent + s""" label="${scope.name}";\n""")

0 commit comments

Comments
 (0)