Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ private[spark] class ExecutorRunner(
val worker: ActorRef,
val workerId: String,
val host: String,
val webUiPort: Int,
val sparkHome: File,
val executorDir: File,
val workerUrl: String,
Expand Down Expand Up @@ -134,6 +135,12 @@ private[spark] class ExecutorRunner(
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

// Add webUI log urls
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like $host here might be a machine's hostname, but we probably want this to reflect SPARK_PUBLIC_DNS (or whatever the new system property equivalent is) so that we generate an externally-accessible URL.

builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
command.mkString("\"", "\" \"", "\""), "=" * 40)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ private[spark] class Worker(
self,
workerId,
host,
webUiPort,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we can fix this for ephemeral ports by using webUi.boundPort here instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this fixes it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Argh, I shouldn't have let this get committed without making sure this comment had been addressed. This is still broken; I'm fixing it as part of my own followup PR, which adds more tests.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh no. I thought I addressed all the comments including this one. Do you want me to fix it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm working on a PR now that will fix this and an issue where this doesn't obey SPARK_PUBLIC_DNS (the latter is a bit trickier to test, unfortunately, since it needs some tricky SparkConf + Mockito usage to mock environment variables).

sparkHome,
executorDir,
akkaUrl,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend(
override def preStart() {
logInfo("Connecting to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
driver ! RegisterExecutor(executorId, hostPort, cores)
driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}

def extractLogUrls: Map[String, String] = {
val prefix = "SPARK_LOG_URL_"
sys.env.filterKeys(_.startsWith(prefix))
.map(e => (e._1.substring(prefix.length).toLowerCase, e._2))
}

override def receiveWithLogging = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ private[spark] object CoarseGrainedClusterMessages {
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage

// Executors to driver
case class RegisterExecutor(executorId: String, hostPort: String, cores: Int)
case class RegisterExecutor(
executorId: String,
hostPort: String,
cores: Int,
logUrls: Map[String, String])
extends CoarseGrainedClusterMessage {
Utils.checkHostPort(hostPort, "Expected host port")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
}

def receiveWithLogging = {
case RegisterExecutor(executorId, hostPort, cores) =>
case RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort)
if (executorDataMap.contains(executorId)) {
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
Expand All @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val (host, _) = Utils.parseHostPort(hostPort)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores)
val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ private[cluster] class ExecutorData(
val executorAddress: Address,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int
) extends ExecutorInfo(executorHost, totalCores)
override val totalCores: Int,
override val logUrlMap: Map[String, String]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,22 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int
) {
val totalCores: Int,
val logUrlMap: Map[String, String]) {

def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]

override def equals(other: Any): Boolean = other match {
case that: ExecutorInfo =>
(that canEqual this) &&
executorHost == that.executorHost &&
totalCores == that.totalCores
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap
case _ => false
}

override def hashCode(): Int = {
val state = Seq(executorHost, totalCores)
val state = Seq(executorHost, totalCores, logUrlMap)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend(
mesosTasks.foreach { case (slaveId, tasks) =>
slaveIdToWorkerOffer.get(slaveId).foreach(o =>
listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
new ExecutorInfo(o.host, o.cores)))
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}
Expand Down
31 changes: 26 additions & 5 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils

/** Summary information about an executor to display in the UI. */
private case class ExecutorSummaryInfo(
// Needs to be private[ui] because of a false positive MiMa failure.
private[ui] case class ExecutorSummaryInfo(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this visibility change is necessary

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It turns out that this is somehow necessary for the MiMa checks to pass. We should add a comment to explain this.

id: String,
hostPort: String,
rddBlocks: Int,
Expand All @@ -40,7 +41,8 @@ private case class ExecutorSummaryInfo(
totalInputBytes: Long,
totalShuffleRead: Long,
totalShuffleWrite: Long,
maxMemory: Long)
maxMemory: Long,
executorLogs: Map[String, String])

private[ui] class ExecutorsPage(
parent: ExecutorsTab,
Expand All @@ -55,6 +57,7 @@ private[ui] class ExecutorsPage(
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

val execTable =
<table class={UIUtils.TABLE_CLASS_STRIPED}>
Expand All @@ -79,10 +82,11 @@ private[ui] class ExecutorsPage(
Shuffle Write
</span>
</th>
{if (logsExist) <th class="sorttable_nosort">Logs</th> else Seq.empty}
{if (threadDumpEnabled) <th class="sorttable_nosort">Thread Dump</th> else Seq.empty}
</thead>
<tbody>
{execInfoSorted.map(execRow)}
{execInfoSorted.map(execRow(_, logsExist))}
</tbody>
</table>

Expand All @@ -107,7 +111,7 @@ private[ui] class ExecutorsPage(
}

/** Render an HTML row representing an executor */
private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = {
val maximumMemory = info.maxMemory
val memoryUsed = info.memoryUsed
val diskUsed = info.diskUsed
Expand Down Expand Up @@ -138,6 +142,21 @@ private[ui] class ExecutorsPage(
<td sorttable_customkey={info.totalShuffleWrite.toString}>
{Utils.bytesToString(info.totalShuffleWrite)}
</td>
{
if (logsExist) {
<td>
{
info.executorLogs.map { case (logName, logUrl) =>
<div>
<a href={logUrl}>
{logName}
</a>
</div>
}
}
</td>
}
}
{
if (threadDumpEnabled) {
val encodedId = URLEncoder.encode(info.id, "UTF-8")
Expand Down Expand Up @@ -168,6 +187,7 @@ private[ui] class ExecutorsPage(
val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty)

new ExecutorSummaryInfo(
execId,
Expand All @@ -183,7 +203,8 @@ private[ui] class ExecutorsPage(
totalInputBytes,
totalShuffleRead,
totalShuffleWrite,
maxMem
maxMem,
executorLogs
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
val executorToOutputBytes = HashMap[String, Long]()
val executorToShuffleRead = HashMap[String, Long]()
val executorToShuffleWrite = HashMap[String, Long]()
val executorToLogUrls = HashMap[String, Map[String, String]]()

def storageStatusList = storageStatusListener.storageStatusList

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized {
val eid = executorAdded.executorId
executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap
}

override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ private[spark] object JsonProtocol {

def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores)
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this use Utils.jsonOption per the comment at the top of JsonProtocol when adding new fields? Also I guess there should be a backward compatibility test case added to JsonProtocolSuite

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree that we should have backwards-compatibility tests for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I was mistaken: we don't need backwards-compatibility tests because this ExecutorInfo class is new in 1.3. We should revert my commit to add the compatibility tests, since it's just adds clutter now.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is correct. The previous commit that was blocking this was my patch to add the ExecutorInfo. I totally blanked too so my mistake.

}

/** ------------------------------ *
Expand Down Expand Up @@ -792,7 +793,8 @@ private[spark] object JsonProtocol {
def executorInfoFromJson(json: JValue): ExecutorInfo = {
val executorHost = (json \ "Host").extract[String]
val totalCores = (json \ "Total Cores").extract[Int]
new ExecutorInfo(executorHost, totalCores)
val logUrls = mapFromJson(json \ "Log Urls").toMap
new ExecutorInfo(executorHost, totalCores, logUrls)
}

/** -------------------------------- *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite {
}

def createExecutorRunner(): ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123,
new File("sparkHome"), new File("workDir"), "akka://worker",
new SparkConf, Seq("localDir"), ExecutorState.RUNNING)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener}
import org.apache.spark.{SparkContext, LocalSparkContext}

class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {

/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000

before {
sc = new SparkContext("local-cluster[2,1,512]", "test")
}

test("verify log urls get propagated from workers") {
val listener = new SaveExecutorInfo
sc.addSparkListener(listener)

val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString)
rdd2.setName("Target RDD")
rdd2.count()

assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
}

private class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()

override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123,
new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"),
ExecutorState.RUNNING)
val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
conf.set("spark.mesos.executor.home" , "/mesos-home")

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
Expand Down Expand Up @@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])

val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
EasyMock.replay(listenerBus)

val sc = EasyMock.createMock(classOf[SparkContext])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite {
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
val applicationEnd = SparkListenerApplicationEnd(42L)
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
new ExecutorInfo("Hostee.awesome.com", 11))
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")

testEvent(stageSubmitted, stageSubmittedJsonString)
Expand All @@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite {
}

test("Dependent Classes") {
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L))
testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L))
testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false))
testTaskMetrics(makeTaskMetrics(
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
testExecutorInfo(new ExecutorInfo("host", 43))
testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))

// StorageLevel
testStorageLevel(StorageLevel.NONE)
Expand Down Expand Up @@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "exec1",
| "Executor Info": {
| "Host": "Hostee.awesome.com",
| "Total Cores": 11
| "Total Cores": 11,
| "Log Urls" : {
| "stderr" : "mystderr",
| "stdout" : "mystdout"
| }
| }
|}
"""
Expand Down
Loading