Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1991,7 +1991,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser, applicationAttemptId))
startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,11 @@ private[spark] trait SchedulerBackend {
*/
def applicationAttemptId(): Option[String] = None

/**
* Get the URLs for the driver logs. These URLs are used to display the links in the UI
* Executors tab for the driver.
* @return Map containing the log names and their respective URLs
*/
def getDriverLogUrls: Option[Map[String, String]] = None

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,13 @@ case class SparkListenerExecutorMetricsUpdate(
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationStart(appName: String, appId: Option[String],
time: Long, sparkUser: String, appAttemptId: Option[String]) extends SparkListenerEvent
case class SparkListenerApplicationStart(
appName: String,
appId: Option[String],
time: Long,
sparkUser: String,
appAttemptId: Option[String],
driverLogs: Option[Map[String, String]] = None) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec

import scala.collection.mutable.HashMap

import org.apache.spark.ExceptionFailure
import org.apache.spark.{ExceptionFailure, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageStatus, StorageStatusListener}
Expand Down Expand Up @@ -73,6 +73,16 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
uiData.finishReason = Some(executorRemoved.reason)
}

override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
applicationStart.driverLogs.foreach { logs =>
val storageStatus = storageStatusList.find { s =>
s.blockManagerId.executorId == SparkContext.LEGACY_DRIVER_IDENTIFIER ||
s.blockManagerId.executorId == SparkContext.DRIVER_IDENTIFIER
}
storageStatus.foreach { s => executorToLogUrls(s.blockManagerId.executorId) = logs.toMap }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the toMap necessary, isn't logs already a map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JsonProtocol returns mutable.Map while executorToLogUrls has value typed as Predef.Map. We can either call toMap here or in JsonProtocol as you can see here: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala#L845

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want me to change it in JsonProtocol? I think it is fine here in this case, since this is where we are using it (unlike in the other case, where it needs to go into an ExecutorInfo.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think its fine as-is then, I was just curious

}
}

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val eid = taskStart.taskInfo.executorId
executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ private[spark] object JsonProtocol {
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser) ~
("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing))
}

def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
Expand Down Expand Up @@ -570,7 +571,8 @@ private[spark] object JsonProtocol {
val time = (json \ "Timestamp").extract[Long]
val sparkUser = (json \ "User").extract[String]
val appAttemptId = Utils.jsonOption(json \ "App Attempt ID").map(_.extract[String])
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId)
val driverLogs = Utils.jsonOption(json \ "Driver Logs").map(mapFromJson)
SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs)
}

def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg

/** Returns the attempt ID. */
def getAttemptId(): ApplicationAttemptId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
val containerId = ConverterUtils.toContainerId(containerIdString)
containerId.getApplicationAttemptId()
YarnSparkHadoopUtil.get.getContainerId.getApplicationAttemptId()
}

/** Returns the configuration for the AmIpFilter to add to the Spark UI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority}
import org.apache.hadoop.yarn.util.ConverterUtils

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
Expand Down Expand Up @@ -136,6 +137,10 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
tokenRenewer.foreach(_.stop())
}

private[spark] def getContainerId: ContainerId = {
val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
ConverterUtils.toContainerId(containerIdString)
}
}

object YarnSparkHadoopUtil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,19 @@

package org.apache.spark.scheduler.cluster

import java.net.NetworkInterface

import scala.collection.JavaConverters._

import org.apache.hadoop.yarn.api.records.NodeState
import org.apache.hadoop.yarn.client.api.YarnClient
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.SparkContext
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.IntParam
import org.apache.spark.util.{IntParam, Utils}

private[spark] class YarnClusterSchedulerBackend(
scheduler: TaskSchedulerImpl,
Expand Down Expand Up @@ -53,4 +62,70 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application attempt ID is not set.")
super.applicationAttemptId
}

override def getDriverLogUrls: Option[Map[String, String]] = {
var yarnClientOpt: Option[YarnClient] = None
var driverLogs: Option[Map[String, String]] = None
try {
val yarnConf = new YarnConfiguration(sc.hadoopConfiguration)
val containerId = YarnSparkHadoopUtil.get.getContainerId
yarnClientOpt = Some(YarnClient.createYarnClient())
yarnClientOpt.foreach { yarnClient =>
yarnClient.init(yarnConf)
yarnClient.start()

// For newer versions of YARN, we can find the HTTP address for a given node by getting a
// container report for a given container. But container reports came only in Hadoop 2.4,
// so we basically have to get the node reports for all nodes and find the one which runs
// this container. For that we have to compare the node's host against the current host.
// Since the host can have multiple addresses, we need to compare against all of them to
// find out if one matches.

// Get all the addresses of this node.
val addresses =
NetworkInterface.getNetworkInterfaces.asScala
.flatMap(_.getInetAddresses.asScala)
.toSeq

// Find a node report that matches one of the addresses
val nodeReport =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a brief comment explaining what is happening here, what cases you are trying to cover?

yarnClient.getNodeReports(NodeState.RUNNING).asScala.find { x =>
val host = x.getNodeId.getHost
addresses.exists { address =>
address.getHostAddress == host ||
address.getHostName == host ||
address.getCanonicalHostName == host
}
}

// Now that we have found the report for the Node Manager that the AM is running on, we
// can get the base HTTP address for the Node manager from the report.
// The format used for the logs for each container is well-known and can be constructed
// using the NM's HTTP address and the container ID.
// The NM may be running several containers, but we can build the URL for the AM using
// the AM's container ID, which we already know.
nodeReport.foreach { report =>
val httpAddress = report.getHttpAddress
// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = yarnConf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
YarnConfiguration.YARN_HTTP_POLICY_DEFAULT
)
val user = Utils.getCurrentUserName()
val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://"
val baseUrl = s"$httpScheme$httpAddress/node/containerlogs/$containerId/$user"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hari and I discussed this offline a bit on how this works when you've got multiple containers on a node -- it is just a bit confusing so I suggested adding a comment here, something like: "The nodeReport gives us the httpAddress for the NodeManager, which may be shared by more than one container on that node. But we know we have the container for the driver because we use the containerId as well"

logDebug(s"Base URL for logs: $baseUrl")
driverLogs = Some(
Map("stderr" -> s"$baseUrl/stderr?start=0", "stdout" -> s"$baseUrl/stdout?start=0"))
}
}
} catch {
case e: Exception =>
logInfo("Node Report API is not available in the version of YARN being used, so AM" +
" logs link will not appear in application UI", e)
} finally {
yarnClientOpt.foreach(_.close())
}
driverLogs
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.io.Source

import com.google.common.base.Charsets.UTF_8
import com.google.common.io.ByteStreams
Expand All @@ -33,7 +34,8 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
SparkListenerExecutorAdded}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -290,10 +292,15 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit

private[spark] class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
var driverLogs: Option[collection.Map[String, String]] = None

override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}

override def onApplicationStart(appStart: SparkListenerApplicationStart): Unit = {
driverLogs = appStart.driverLogs
}
}

private object YarnClusterDriver extends Logging with Matchers {
Expand All @@ -314,6 +321,7 @@ private object YarnClusterDriver extends Logging with Matchers {
val sc = new SparkContext(new SparkConf()
.set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
val conf = sc.getConf
val status = new File(args(0))
var result = "failure"
try {
Expand All @@ -335,6 +343,20 @@ private object YarnClusterDriver extends Logging with Matchers {
executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}

// If we are running in yarn-cluster mode, verify that driver logs are downloadable.
if (conf.get("spark.master") == "yarn-cluster") {
assert(listener.driverLogs.nonEmpty)
val driverLogs = listener.driverLogs.get
assert(driverLogs.size === 2)
assert(driverLogs.containsKey("stderr"))
assert(driverLogs.containsKey("stdout"))
val stderr = driverLogs("stderr") // YARN puts everything in stderr.
val lines = Source.fromURL(stderr).getLines()
// Look for a line that contains YarnClusterSchedulerBackend, since that is guaranteed in
// cluster mode.
assert(lines.exists(_.contains("YarnClusterSchedulerBackend")))
}
}

}
Expand Down