Skip to content

Commit 32e964c

Browse files
Kostas SakellisJoshRosen
authored andcommitted
SPARK-2450 Adds executor log links to Web UI
Adds links to stderr/stdout in the executor tab of the webUI for: 1) Standalone 2) Yarn client 3) Yarn cluster This tries to add the log url support in a general way so as to make it easy to add support for all the cluster managers. This is done by using environment variables to pass to the executor the log urls. The SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added. To propagate this information to the UI we use the onExecutorAdded spark listener event. Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism. Author: Kostas Sakellis <[email protected]> Author: Josh Rosen <[email protected]> Closes #3486 from ksakellis/kostas-spark-2450 and squashes the following commits: d190936 [Josh Rosen] Fix a few minor style / formatting nits. Reset listener after each test Don't null listener out at end of main(). 8673fe1 [Kostas Sakellis] CR feedback. Hide the log column if there are no logs available 5bf6952 [Kostas Sakellis] [SPARK-2450] [CORE] Adds exeuctor log links to Web UI
1 parent 4cdb26c commit 32e964c

File tree

18 files changed

+178
-30
lines changed

18 files changed

+178
-30
lines changed

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
4343
val worker: ActorRef,
4444
val workerId: String,
4545
val host: String,
46+
val webUiPort: Int,
4647
val sparkHome: File,
4748
val executorDir: File,
4849
val workerUrl: String,
@@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
134135
// In case we are running this from within the Spark Shell, avoid creating a "scala"
135136
// parent process for the executor command
136137
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
138+
139+
// Add webUI log urls
140+
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
141+
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
142+
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
143+
137144
process = builder.start()
138145
val header = "Spark Executor Command: %s\n%s\n\n".format(
139146
command.mkString("\"", "\" \"", "\""), "=" * 40)

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,7 @@ private[spark] class Worker(
362362
self,
363363
workerId,
364364
host,
365+
webUiPort,
365366
sparkHome,
366367
executorDir,
367368
akkaUrl,

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
4949
override def preStart() {
5050
logInfo("Connecting to driver: " + driverUrl)
5151
driver = context.actorSelection(driverUrl)
52-
driver ! RegisterExecutor(executorId, hostPort, cores)
52+
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
5353
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
5454
}
5555

56+
def extractLogUrls: Map[String, String] = {
57+
val prefix = "SPARK_LOG_URL_"
58+
sys.env.filterKeys(_.startsWith(prefix))
59+
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
60+
}
61+
5662
override def receiveWithLogging = {
5763
case RegisteredExecutor =>
5864
logInfo("Successfully registered with driver")

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
3939
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
4040

4141
// Executors to driver
42-
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
42+
case class RegisterExecutor(
43+
executorId: String,
44+
hostPort: String,
45+
cores: Int,
46+
logUrls: Map[String, String])
4347
extends CoarseGrainedClusterMessage {
4448
Utils.checkHostPort(hostPort, "Expected host port")
4549
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
8686
}
8787

8888
def receiveWithLogging = {
89-
case RegisterExecutor(executorId, hostPort, cores) =>
89+
case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
9090
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
9191
if (executorDataMap.contains(executorId)) {
9292
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
@@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
9898
totalCoreCount.addAndGet(cores)
9999
totalRegisteredExecutors.addAndGet(1)
100100
val (host, _) = Utils.parseHostPort(hostPort)
101-
val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
101+
val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
102102
// This must be synchronized because variables mutated
103103
// in this block are read when requesting executors
104104
CoarseGrainedSchedulerBackend.this.synchronized {

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
3333
val executorAddress: Address,
3434
override val executorHost: String,
3535
var freeCores: Int,
36-
override val totalCores: Int
37-
) extends ExecutorInfo(executorHost, totalCores)
36+
override val totalCores: Int,
37+
override val logUrlMap: Map[String, String]
38+
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,21 +25,22 @@ import org.apache.spark.annotation.DeveloperApi
2525
@DeveloperApi
2626
class ExecutorInfo(
2727
val executorHost: String,
28-
val totalCores: Int
29-
) {
28+
val totalCores: Int,
29+
val logUrlMap: Map[String, String]) {
3030

3131
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
3232

3333
override def equals(other: Any): Boolean = other match {
3434
case that: ExecutorInfo =>
3535
(that canEqual this) &&
3636
executorHost == that.executorHost &&
37-
totalCores == that.totalCores
37+
totalCores == that.totalCores &&
38+
logUrlMap == that.logUrlMap
3839
case _ => false
3940
}
4041

4142
override def hashCode(): Int = {
42-
val state = Seq(executorHost, totalCores)
43+
val state = Seq(executorHost, totalCores, logUrlMap)
4344
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
4445
}
4546
}

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
270270
mesosTasks.foreach { case (slaveId, tasks) =>
271271
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
272272
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
273-
new ExecutorInfo(o.host, o.cores)))
273+
// TODO: Add support for log urls for Mesos
274+
new ExecutorInfo(o.host, o.cores, Map.empty)))
274275
)
275276
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
276277
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
2626
import org.apache.spark.util.Utils
2727

2828
/** Summary information about an executor to display in the UI. */
29-
private case class ExecutorSummaryInfo(
29+
// Needs to be private[ui] because of a false positive MiMa failure.
30+
private[ui] case class ExecutorSummaryInfo(
3031
id: String,
3132
hostPort: String,
3233
rddBlocks: Int,
@@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
4041
totalInputBytes: Long,
4142
totalShuffleRead: Long,
4243
totalShuffleWrite: Long,
43-
maxMemory: Long)
44+
maxMemory: Long,
45+
executorLogs: Map[String, String])
4446

4547
private[ui] class ExecutorsPage(
4648
parent: ExecutorsTab,
@@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
5557
val diskUsed = storageStatusList.map(_.diskUsed).sum
5658
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
5759
val execInfoSorted = execInfo.sortBy(_.id)
60+
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty
5861

5962
val execTable =
6063
<table class={UIUtils.TABLE_CLASS_STRIPED}>
@@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
7982
Shuffle Write
8083
</span>
8184
</th>
85+
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
8286
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
8387
</thead>
8488
<tbody>
85-
{execInfoSorted.map(execRow)}
89+
{execInfoSorted.map(execRow(_, logsExist))}
8690
</tbody>
8791
</table>
8892

@@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
107111
}
108112

109113
/** Render an HTML row representing an executor */
110-
private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
114+
private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
111115
val maximumMemory = info.maxMemory
112116
val memoryUsed = info.memoryUsed
113117
val diskUsed = info.diskUsed
@@ -138,6 +142,21 @@ private[ui] class ExecutorsPage(
138142
<td sorttable_customkey={info.totalShuffleWrite.toString}>
139143
{Utils.bytesToString(info.totalShuffleWrite)}
140144
</td>
145+
{
146+
if (logsExist) {
147+
<td>
148+
{
149+
info.executorLogs.map { case (logName, logUrl) =>
150+
<div>
151+
<a href={logUrl}>
152+
{logName}
153+
</a>
154+
</div>
155+
}
156+
}
157+
</td>
158+
}
159+
}
141160
{
142161
if (threadDumpEnabled) {
143162
val encodedId = URLEncoder.encode(info.id, "UTF-8")
@@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
168187
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
169188
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
170189
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
190+
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)
171191

172192
new ExecutorSummaryInfo(
173193
execId,
@@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
183203
totalInputBytes,
184204
totalShuffleRead,
185205
totalShuffleWrite,
186-
maxMem
206+
maxMem,
207+
executorLogs
187208
)
188209
}
189210
}

core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
5151
val executorToOutputBytes = HashMap[String, Long]()
5252
val executorToShuffleRead = HashMap[String, Long]()
5353
val executorToShuffleWrite = HashMap[String, Long]()
54+
val executorToLogUrls = HashMap[String, Map[String, String]]()
5455

5556
def storageStatusList = storageStatusListener.storageStatusList
5657

58+
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
59+
val eid = executorAdded.executorId
60+
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
61+
}
62+
5763
override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
5864
val eid = taskStart.taskInfo.executorId
5965
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1

0 commit comments

Comments
 (0)