From 5bf6952c1f158ee5a44b67c92f4f7bc0c0945872 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Tue, 16 Dec 2014 18:31:48 -0800 Subject: [PATCH 1/3] [SPARK-2450] [CORE] Adds exeuctor log links to Web UI Adds links to stderr/stdout in the executor tab of the webUI for: 1) Standalone 2) Yarn client 3) Yarn cluster This tries to add the log url support in a general way so as to make it easy to add support for all the cluster managers. This is done by using environment variables to pass to the executor the log urls. The SPARK_LOG_URL_ prefix is used and so additional logs besides stderr/stdout can also be added. To propagate this information to the UI we use the onExecutorAdded spark listener event. Although this commit doesn't add log urls when running on a mesos cluster, it should be possible to add using the same mechanism. --- .../spark/deploy/worker/ExecutorRunner.scala | 7 +++ .../apache/spark/deploy/worker/Worker.scala | 1 + .../CoarseGrainedExecutorBackend.scala | 8 ++- .../cluster/CoarseGrainedClusterMessage.scala | 4 +- .../CoarseGrainedSchedulerBackend.scala | 4 +- .../scheduler/cluster/ExecutorData.scala | 5 +- .../scheduler/cluster/ExecutorInfo.scala | 9 +-- .../cluster/mesos/MesosSchedulerBackend.scala | 3 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 17 +++++- .../apache/spark/ui/exec/ExecutorsTab.scala | 8 +++ .../org/apache/spark/util/JsonProtocol.scala | 6 +- .../spark/deploy/JsonProtocolSuite.scala | 2 +- .../spark/deploy/LogUrlsStandaloneSuite.scala | 59 +++++++++++++++++++ .../deploy/worker/ExecutorRunnerTest.scala | 2 +- .../mesos/MesosSchedulerBackendSuite.scala | 4 +- .../apache/spark/util/JsonProtocolSuite.scala | 12 +++- .../spark/deploy/yarn/ExecutorRunnable.scala | 12 +++- .../spark/deploy/yarn/YarnClusterSuite.scala | 30 +++++++++- 18 files changed, 166 insertions(+), 27 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index bc9f78b9e5c7..0add3064da45 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -43,6 +43,7 @@ private[spark] class ExecutorRunner( val worker: ActorRef, val workerId: String, val host: String, + val webUiPort: Int, val sparkHome: File, val executorDir: File, val workerUrl: String, @@ -134,6 +135,12 @@ private[spark] class ExecutorRunner( // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0") + + // Add webUI log urls + val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType=" + builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr") + builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout") + process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( command.mkString("\"", "\" \"", "\""), "=" * 40) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index b20f5c0c8289..10929eb51604 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -362,6 +362,7 @@ private[spark] class Worker( self, workerId, host, + webUiPort, sparkHome, executorDir, akkaUrl, diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index bc72c8970319..8a11c49389c3 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -49,10 +49,16 @@ private[spark] class CoarseGrainedExecutorBackend( override def preStart() { logInfo("Connecting to driver: " + driverUrl) driver = context.actorSelection(driverUrl) - driver ! RegisterExecutor(executorId, hostPort, cores) + driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } + def extractLogUrls : Map[String, String] = { + val prefix = "SPARK_LOG_URL_" + sys.env.filterKeys(_.startsWith(prefix)) + .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) + } + override def receiveWithLogging = { case RegisteredExecutor => logInfo("Successfully registered with driver") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 1da6fe976da5..dc7e52b28a70 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -39,8 +39,8 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage // Executors to driver - case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) - extends CoarseGrainedClusterMessage { + case class RegisterExecutor(executorId: String, hostPort: String, cores: Int, + logUrls : Map[String, String]) extends CoarseGrainedClusterMessage { Utils.checkHostPort(hostPort, "Expected host port") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 103a5c053c28..9d2fb4f3b472 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -86,7 +86,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste } def receiveWithLogging = { - case RegisterExecutor(executorId, hostPort, cores) => + case RegisterExecutor(executorId, hostPort, cores, logUrls) => Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if (executorDataMap.contains(executorId)) { sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId) @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val (host, _) = Utils.parseHostPort(hostPort) - val data = new ExecutorData(sender, sender.path.address, host, cores, cores) + val data = new ExecutorData(sender, sender.path.address, host, cores, cores, logUrls) // This must be synchronized because variables mutated // in this block are read when requesting executors CoarseGrainedSchedulerBackend.this.synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index eb52ddfb1eab..17d9c303ce62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -33,5 +33,6 @@ private[cluster] class ExecutorData( val executorAddress: Address, override val executorHost: String, var freeCores: Int, - override val totalCores: Int -) extends ExecutorInfo(executorHost, totalCores) + override val totalCores: Int, + override val logUrlMap : Map[String, String] +) extends ExecutorInfo(executorHost, totalCores, logUrlMap) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala index b4738e64c939..7f218566146a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala @@ -25,8 +25,8 @@ import org.apache.spark.annotation.DeveloperApi @DeveloperApi class ExecutorInfo( val executorHost: String, - val totalCores: Int -) { + val totalCores: Int, + val logUrlMap: Map[String, String]) { def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo] @@ -34,12 +34,13 @@ class ExecutorInfo( case that: ExecutorInfo => (that canEqual this) && executorHost == that.executorHost && - totalCores == that.totalCores + totalCores == that.totalCores && + logUrlMap == that.logUrlMap case _ => false } override def hashCode(): Int = { - val state = Seq(executorHost, totalCores) + val state = Seq(executorHost, totalCores, logUrlMap) state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c3c546be6da1..cfb6592e14aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -270,7 +270,8 @@ private[spark] class MesosSchedulerBackend( mesosTasks.foreach { case (slaveId, tasks) => slaveIdToWorkerOffer.get(slaveId).foreach(o => listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId, - new ExecutorInfo(o.host, o.cores))) + // TODO: Add support for log urls for Mesos + new ExecutorInfo(o.host, o.cores, Map.empty))) ) d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 363cb96de799..3b11a07ad0e5 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -26,7 +26,7 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Summary information about an executor to display in the UI. */ -private case class ExecutorSummaryInfo( +private[ui] case class ExecutorSummaryInfo( id: String, hostPort: String, rddBlocks: Int, @@ -40,7 +40,8 @@ private case class ExecutorSummaryInfo( totalInputBytes: Long, totalShuffleRead: Long, totalShuffleWrite: Long, - maxMemory: Long) + maxMemory: Long, + executorLogs : Map[String, String]) private[ui] class ExecutorsPage( parent: ExecutorsTab, @@ -79,6 +80,7 @@ private[ui] class ExecutorsPage( Shuffle Write + Logs {if (threadDumpEnabled) Thread Dump else Seq.empty} @@ -138,6 +140,13 @@ private[ui] class ExecutorsPage( {Utils.bytesToString(info.totalShuffleWrite)} + + { + info.executorLogs.map(entry => { +
{entry._1}
+ }) + } + { if (threadDumpEnabled) { val encodedId = URLEncoder.encode(info.id, "UTF-8") @@ -168,6 +177,7 @@ private[ui] class ExecutorsPage( val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L) val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L) val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L) + val executorLogs = listener.executorToLogUrls.getOrElse(execId, Map.empty) new ExecutorSummaryInfo( execId, @@ -183,7 +193,8 @@ private[ui] class ExecutorsPage( totalInputBytes, totalShuffleRead, totalShuffleWrite, - maxMem + maxMem, + executorLogs ) } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index dd1c2b78c409..64b174d326d9 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui.exec +import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} + import scala.collection.mutable.HashMap import org.apache.spark.ExceptionFailure @@ -51,9 +53,15 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp val executorToOutputBytes = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() + val executorToLogUrls = HashMap[String, Map[String, String]]() def storageStatusList = storageStatusListener.storageStatusList + override def onExecutorAdded(executorAdded : SparkListenerExecutorAdded) = synchronized { + val eid = executorAdded.executorId + executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap + } + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { val eid = taskStart.taskInfo.executorId executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 8e0e41ad3782..c8407bbcb780 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -383,7 +383,8 @@ private[spark] object JsonProtocol { def executorInfoToJson(executorInfo: ExecutorInfo): JValue = { ("Host" -> executorInfo.executorHost) ~ - ("Total Cores" -> executorInfo.totalCores) + ("Total Cores" -> executorInfo.totalCores) ~ + ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) } /** ------------------------------ * @@ -792,7 +793,8 @@ private[spark] object JsonProtocol { def executorInfoFromJson(json: JValue): ExecutorInfo = { val executorHost = (json \ "Host").extract[String] val totalCores = (json \ "Total Cores").extract[Int] - new ExecutorInfo(executorHost, totalCores) + val logUrls = mapFromJson(json \ "Log Urls").toMap + new ExecutorInfo(executorHost, totalCores, logUrls) } /** -------------------------------- * diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index aa65f7e8915e..ed02ca81e405 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -117,7 +117,7 @@ class JsonProtocolSuite extends FunSuite { } def createExecutorRunner(): ExecutorRunner = { - new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", + new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host", 123, new File("sparkHome"), new File("workDir"), "akka://worker", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala new file mode 100644 index 000000000000..ef22168de986 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} +import org.apache.spark.{SparkContext, LocalSparkContext} +import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} + +import scala.collection.mutable + +class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext + with BeforeAndAfter with BeforeAndAfterAll { + + /** Length of time to wait while draining listener events. */ + val WAIT_TIMEOUT_MILLIS = 10000 + + before { + sc = new SparkContext("local-cluster[2,1,512]", "test") + } + + test("verify log urls get propagated from workers") { + val listener = new SaveExecutorInfo + sc.addSparkListener(listener) + + val rdd1 = sc.parallelize(1 to 100, 4) + val rdd2 = rdd1.map(_.toString) + rdd2.setName("Target RDD") + rdd2.count() + + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + listener.addedExecutorInfos.foreach(e => { + assert(e._2.logUrlMap.nonEmpty) + }) + } + + private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() + + override def onExecutorAdded(executor : SparkListenerExecutorAdded) { + addedExecutorInfos(executor.executorId) = executor.executorInfo + } + } +} diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index 6f233d7cf97a..76511699e5ac 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -32,7 +32,7 @@ class ExecutorRunnerTest extends FunSuite { val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl") - val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", + val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", 123, new File(sparkHome), new File("ooga"), "blah", new SparkConf, Seq("localDir"), ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder(appDesc.command, 512, sparkHome, er.substituteVariables) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index f2ff98eb72da..46ab02bfef78 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea conf.set("spark.mesos.executor.home" , "/mesos-home") val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2))) + listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) @@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) val listenerBus = EasyMock.createMock(classOf[LiveListenerBus]) - listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2))) + listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) EasyMock.replay(listenerBus) val sc = EasyMock.createMock(classOf[SparkContext]) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 6577ebaa2e9a..842f54529baf 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -76,8 +76,9 @@ class JsonProtocolSuite extends FunSuite { val unpersistRdd = SparkListenerUnpersistRDD(12345) val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield") val applicationEnd = SparkListenerApplicationEnd(42L) + val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1", - new ExecutorInfo("Hostee.awesome.com", 11)) + new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap)) val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason") testEvent(stageSubmitted, stageSubmittedJsonString) @@ -100,13 +101,14 @@ class JsonProtocolSuite extends FunSuite { } test("Dependent Classes") { + val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap testRDDInfo(makeRddInfo(2, 3, 4, 5L, 6L)) testStageInfo(makeStageInfo(10, 20, 30, 40L, 50L)) testTaskInfo(makeTaskInfo(999L, 888, 55, 777L, false)) testTaskMetrics(makeTaskMetrics( 33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false)) testBlockManagerId(BlockManagerId("Hong", "Kong", 500)) - testExecutorInfo(new ExecutorInfo("host", 43)) + testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap)) // StorageLevel testStorageLevel(StorageLevel.NONE) @@ -1463,7 +1465,11 @@ class JsonProtocolSuite extends FunSuite { | "Executor ID": "exec1", | "Executor Info": { | "Host": "Hostee.awesome.com", - | "Total Cores": 11 + | "Total Cores": 11, + | "Log Urls" : { + | "stderr" : "mystderr", + | "stdout" : "mystdout" + | } | } |} """ diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index ee2002a35f52..408cf09b9bdf 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -56,7 +56,7 @@ class ExecutorRunnable( var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - lazy val env = prepareEnvironment + lazy val env = prepareEnvironment(container) def run = { logInfo("Starting Executor Container") @@ -254,7 +254,7 @@ class ExecutorRunnable( localResources } - private def prepareEnvironment: HashMap[String, String] = { + private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.executor.extraClassPath") Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp) @@ -270,6 +270,14 @@ class ExecutorRunnable( YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) } + // Add log urls + sys.env.get("SPARK_USER").foreach { user => + val baseUrl = "http://%s/node/containerlogs/%s/%s" + .format(container.getNodeHttpAddress, ConverterUtils.toString(container.getId), user) + env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=0" + env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=0" + } + System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } env } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 7165918e1bfc..a999541dec27 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -20,6 +20,9 @@ package org.apache.spark.deploy.yarn import java.io.File import java.util.concurrent.TimeUnit +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} + import scala.collection.JavaConversions._ import com.google.common.base.Charsets @@ -30,9 +33,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils +import scala.collection.mutable + class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { // log4j configuration for the Yarn containers, so that their output is collected @@ -143,6 +147,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit var result = File.createTempFile("result", null, tempDir) YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) checkResult(result) + + // verify log urls are present + YarnClusterDriver.listener.addedExecutorInfos.foreach(e => { + assert(e._2.logUrlMap.nonEmpty) + }) } test("run Spark in yarn-cluster mode") { @@ -156,6 +165,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit "--num-executors", "1") Client.main(args) checkResult(result) + + // verify log urls are present. + YarnClusterDriver.listener.addedExecutorInfos.foreach(e => { + assert(e._2.logUrlMap.nonEmpty) + }) } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -203,8 +217,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } +private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() + + override def onExecutorAdded(executor : SparkListenerExecutorAdded) { + addedExecutorInfos(executor.executorId) = executor.executorInfo + } +} + private object YarnClusterDriver extends Logging with Matchers { + val WAIT_TIMEOUT_MILLIS = 10000 + val listener = new SaveExecutorInfo + def main(args: Array[String]) = { if (args.length != 2) { System.err.println( @@ -216,12 +241,15 @@ private object YarnClusterDriver extends Logging with Matchers { System.exit(1) } + val sc = new SparkContext(new SparkConf().setMaster(args(0)) .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) + sc.addSparkListener(listener) val status = new File(args(1)) var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) data should be (Set(1, 2, 3, 4)) result = "success" } finally { From 8673fe1c62f4f85daca029e5d33628b62b2aa037 Mon Sep 17 00:00:00 2001 From: Kostas Sakellis Date: Tue, 27 Jan 2015 14:12:04 -0800 Subject: [PATCH 2/3] CR feedback. Hide the log column if there are no logs available --- .../CoarseGrainedExecutorBackend.scala | 2 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 23 ++++++++++++------- .../spark/deploy/yarn/YarnClusterSuite.scala | 17 ++++++-------- 3 files changed, 23 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 8a11c49389c3..3a42f8b15797 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -53,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend( context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) } - def extractLogUrls : Map[String, String] = { + def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) .map(e => (e._1.substring(prefix.length).toLowerCase, e._2)) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 3b11a07ad0e5..6c0de78fe99b 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -56,6 +56,7 @@ private[ui] class ExecutorsPage( val diskUsed = storageStatusList.map(_.diskUsed).sum val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId) val execInfoSorted = execInfo.sortBy(_.id) + val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty val execTable = @@ -80,11 +81,11 @@ private[ui] class ExecutorsPage( Shuffle Write - + {if (logsExist) else Seq.empty} {if (threadDumpEnabled) else Seq.empty} - {execInfoSorted.map(execRow)} + {execInfoSorted.map(execRow(_, logsExist))}
LogsLogsThread Dump
@@ -109,7 +110,7 @@ private[ui] class ExecutorsPage( } /** Render an HTML row representing an executor */ - private def execRow(info: ExecutorSummaryInfo): Seq[Node] = { + private def execRow(info: ExecutorSummaryInfo, logsExist: Boolean): Seq[Node] = { val maximumMemory = info.maxMemory val memoryUsed = info.memoryUsed val diskUsed = info.diskUsed @@ -140,13 +141,19 @@ private[ui] class ExecutorsPage( {Utils.bytesToString(info.totalShuffleWrite)} - { - info.executorLogs.map(entry => { -
{entry._1}
- }) + if (logsExist) { + + {info.executorLogs.map { entry => { +
+ + {entry._1} + +
} + }} + + } } - { if (threadDumpEnabled) { val encodedId = URLEncoder.encode(info.id, "UTF-8") diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index a999541dec27..0214263d5733 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -20,22 +20,19 @@ package org.apache.spark.deploy.yarn import java.io.File import java.util.concurrent.TimeUnit -import org.apache.spark.scheduler.cluster.ExecutorInfo -import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} - import scala.collection.JavaConversions._ +import scala.collection.mutable import com.google.common.base.Charsets import com.google.common.io.Files -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} - import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils - -import scala.collection.mutable +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { @@ -167,9 +164,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit checkResult(result) // verify log urls are present. - YarnClusterDriver.listener.addedExecutorInfos.foreach(e => { + YarnClusterDriver.listener.addedExecutorInfos.foreach { e => { assert(e._2.logUrlMap.nonEmpty) - }) + }} } test("run Spark in yarn-cluster mode unsuccessfully") { From d190936f753ff66586f9aa3cc522fd9c5ba4a321 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 4 Feb 2015 12:38:10 -0800 Subject: [PATCH 3/3] Fix a few minor style / formatting nits. Reset listener after each test Don't null listener out at end of main(). --- .../cluster/CoarseGrainedClusterMessage.scala | 8 ++++++-- .../scheduler/cluster/ExecutorData.scala | 2 +- .../apache/spark/ui/exec/ExecutorsPage.scala | 19 +++++++++++-------- .../apache/spark/ui/exec/ExecutorsTab.scala | 4 +--- .../spark/deploy/LogUrlsStandaloneSuite.scala | 18 +++++++++--------- .../spark/deploy/yarn/YarnClusterSuite.scala | 18 +++++++++--------- 6 files changed, 37 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index dc7e52b28a70..9bf74f4be198 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -39,8 +39,12 @@ private[spark] object CoarseGrainedClusterMessages { case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage // Executors to driver - case class RegisterExecutor(executorId: String, hostPort: String, cores: Int, - logUrls : Map[String, String]) extends CoarseGrainedClusterMessage { + case class RegisterExecutor( + executorId: String, + hostPort: String, + cores: Int, + logUrls: Map[String, String]) + extends CoarseGrainedClusterMessage { Utils.checkHostPort(hostPort, "Expected host port") } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala index 17d9c303ce62..5e571efe7672 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala @@ -34,5 +34,5 @@ private[cluster] class ExecutorData( override val executorHost: String, var freeCores: Int, override val totalCores: Int, - override val logUrlMap : Map[String, String] + override val logUrlMap: Map[String, String] ) extends ExecutorInfo(executorHost, totalCores, logUrlMap) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 6c0de78fe99b..956608d7c0cb 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -26,6 +26,7 @@ import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage} import org.apache.spark.util.Utils /** Summary information about an executor to display in the UI. */ +// Needs to be private[ui] because of a false positive MiMa failure. private[ui] case class ExecutorSummaryInfo( id: String, hostPort: String, @@ -41,7 +42,7 @@ private[ui] case class ExecutorSummaryInfo( totalShuffleRead: Long, totalShuffleWrite: Long, maxMemory: Long, - executorLogs : Map[String, String]) + executorLogs: Map[String, String]) private[ui] class ExecutorsPage( parent: ExecutorsTab, @@ -144,13 +145,15 @@ private[ui] class ExecutorsPage( { if (logsExist) { - {info.executorLogs.map { entry => { -
- - {entry._1} - -
} - }} + { + info.executorLogs.map { case (logName, logUrl) => +
+ + {logName} + +
+ } + } } } diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 64b174d326d9..a38cb75fdd8c 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -17,8 +17,6 @@ package org.apache.spark.ui.exec -import org.apache.spark.{SparkEnv, Logging, SparkException, TaskState} - import scala.collection.mutable.HashMap import org.apache.spark.ExceptionFailure @@ -57,7 +55,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp def storageStatusList = storageStatusListener.storageStatusList - override def onExecutorAdded(executorAdded : SparkListenerExecutorAdded) = synchronized { + override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded) = synchronized { val eid = executorAdded.executorId executorToLogUrls(eid) = executorAdded.executorInfo.logUrlMap } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index ef22168de986..f33bdc73e40a 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -17,15 +17,15 @@ package org.apache.spark.deploy +import scala.collection.mutable + +import org.scalatest.{BeforeAndAfter, FunSuite} + import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListener} import org.apache.spark.{SparkContext, LocalSparkContext} -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} -import scala.collection.mutable - -class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext - with BeforeAndAfter with BeforeAndAfterAll { +class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext with BeforeAndAfter { /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 @@ -44,15 +44,15 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext rdd2.count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) - listener.addedExecutorInfos.foreach(e => { - assert(e._2.logUrlMap.nonEmpty) - }) + listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } private class SaveExecutorInfo extends SparkListener { val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() - override def onExecutorAdded(executor : SparkListenerExecutorAdded) { + override def onExecutorAdded(executor: SparkListenerExecutorAdded) { addedExecutorInfos(executor.executorId) = executor.executorInfo } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 0214263d5733..eda40efc4c77 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -29,10 +29,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { @@ -146,9 +146,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit checkResult(result) // verify log urls are present - YarnClusterDriver.listener.addedExecutorInfos.foreach(e => { - assert(e._2.logUrlMap.nonEmpty) - }) + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode") { @@ -164,9 +164,9 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit checkResult(result) // verify log urls are present. - YarnClusterDriver.listener.addedExecutorInfos.foreach { e => { - assert(e._2.logUrlMap.nonEmpty) - }} + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -225,7 +225,7 @@ private class SaveExecutorInfo extends SparkListener { private object YarnClusterDriver extends Logging with Matchers { val WAIT_TIMEOUT_MILLIS = 10000 - val listener = new SaveExecutorInfo + var listener: SaveExecutorInfo = null def main(args: Array[String]) = { if (args.length != 2) { @@ -238,7 +238,7 @@ private object YarnClusterDriver extends Logging with Matchers { System.exit(1) } - + listener = new SaveExecutorInfo val sc = new SparkContext(new SparkConf().setMaster(args(0)) .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) sc.addSparkListener(listener)