diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala index 974697890dd0..79a81d3c7471 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala @@ -17,6 +17,7 @@ package org.apache.spark.status.api.v1 import java.io.OutputStream +import java.net.URI import java.util.{List => JList} import java.util.zip.ZipOutputStream import javax.ws.rs._ @@ -49,7 +50,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @GET @Path("executors") - def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true)) + def executorList(): Seq[ExecutorSummary] = fetchExecutors(true) @GET @Path("executors/{executorId}/threads") @@ -76,7 +77,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @GET @Path("allexecutors") - def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false)) + def allExecutorList(): Seq[ExecutorSummary] = fetchExecutors(false) @Path("stages") def stages(): Class[StagesResource] = classOf[StagesResource] @@ -160,6 +161,62 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { classOf[OneApplicationAttemptResource] } + private def fetchExecutors(activeOnly: Boolean): Seq[ExecutorSummary] = { + withUI(ui => { + val tmpExecutorList = ui.store.executorList(activeOnly) + ui.yarnLogServerUrl.map(lurl => + tmpExecutorList.map(withYarnLogServerLogs(toYarnLogServerUrl(lurl, ui.nmRpcPort))) + ).getOrElse(tmpExecutorList) + }) + } + + private def toYarnLogServerUrl(logServerUrl: String, nmPort: Int)(nmLogUrl: String): String = { + val containerSuffixPos = nmLogUrl.indexOf("container_") + if (containerSuffixPos >= 0) { + val nodeId = URI.create(nmLogUrl).getHost + ":" + nmPort + val containerSuffix = nmLogUrl.substring(containerSuffixPos) + val containerEndPos = containerSuffix.indexOf("/") + if (containerEndPos >= 0) { + val container = containerSuffix.substring(0, containerEndPos) + s"$logServerUrl/$nodeId/$container/$containerSuffix" + } else { + nmLogUrl + } + } else { + nmLogUrl + } + } + + private def withYarnLogServerLogs( + logRewrite: String => String)( + info: ExecutorSummary): ExecutorSummary = { + new ExecutorSummary( + id = info.id, + hostPort = info.hostPort, + isActive = info.isActive, + rddBlocks = info.rddBlocks, + memoryUsed = info.memoryUsed, + diskUsed = info.diskUsed, + totalCores = info.totalCores, + maxTasks = info.maxTasks, + activeTasks = info.activeTasks, + failedTasks = info.failedTasks, + completedTasks = info.completedTasks, + totalTasks = info.totalTasks, + totalDuration = info.totalDuration, + totalGCTime = info.totalGCTime, + totalInputBytes = info.totalInputBytes, + totalShuffleRead = info.totalShuffleRead, + totalShuffleWrite = info.totalShuffleWrite, + isBlacklisted = info.isBlacklisted, + maxMemory = info.maxMemory, + addTime = info.addTime, + removeTime = info.removeTime, + removeReason = info.removeReason, + executorLogs = info.executorLogs.mapValues(logRewrite), + memoryMetrics = info.memoryMetrics + ) + } } private[v1] class OneApplicationResource extends AbstractApplicationResource { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index b44ac0ea1feb..8acfe7161ff5 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -17,11 +17,12 @@ package org.apache.spark.ui -import java.util.{Date, List => JList, ServiceLoader} +import java.util.Date -import scala.collection.JavaConverters._ +import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.spark.{JobExecutionStatus, SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf, SparkContext} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.status.AppStatusStore @@ -31,7 +32,6 @@ import org.apache.spark.ui.env.EnvironmentTab import org.apache.spark.ui.exec.ExecutorsTab import org.apache.spark.ui.jobs.{JobsTab, StagesTab} import org.apache.spark.ui.storage.StorageTab -import org.apache.spark.util.Utils /** * Top level user interface for a Spark application. @@ -52,6 +52,22 @@ private[spark] class SparkUI private ( val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false) + private val yarnConf = SparkHadoopUtil.get.newConfiguration(conf) + private val portReg = "^.*:([0-9]+)$".r + private[spark] val nmRpcPort = Option(yarnConf.get(YarnConfiguration.NM_ADDRESS)) + .map { case portReg(port) => port.toInt } + .getOrElse(0) + + private val isHistoryUI = sc.isEmpty + private val useAggregatedLogs = isHistoryUI && nmRpcPort > 0 && + yarnConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false) + + private[spark] val yarnLogServerUrl = if (useAggregatedLogs) { + Option(yarnConf.get(YarnConfiguration.YARN_LOG_SERVER_URL)) + } else { + None + } + var appId: String = _ private var streamingJobProgressListener: Option[SparkListener] = None