Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.status.api.v1

import java.io.OutputStream
import java.net.URI
import java.util.{List => JList}
import java.util.zip.ZipOutputStream
import javax.ws.rs._
Expand Down Expand Up @@ -49,7 +50,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {

@GET
@Path("executors")
def executorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(true))
def executorList(): Seq[ExecutorSummary] = fetchExecutors(true)

@GET
@Path("executors/{executorId}/threads")
Expand All @@ -76,7 +77,7 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {

@GET
@Path("allexecutors")
def allExecutorList(): Seq[ExecutorSummary] = withUI(_.store.executorList(false))
def allExecutorList(): Seq[ExecutorSummary] = fetchExecutors(false)

@Path("stages")
def stages(): Class[StagesResource] = classOf[StagesResource]
Expand Down Expand Up @@ -160,6 +161,62 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
classOf[OneApplicationAttemptResource]
}

private def fetchExecutors(activeOnly: Boolean): Seq[ExecutorSummary] = {
withUI(ui => {
val tmpExecutorList = ui.store.executorList(activeOnly)
ui.yarnLogServerUrl.map(lurl =>
tmpExecutorList.map(withYarnLogServerLogs(toYarnLogServerUrl(lurl, ui.nmRpcPort)))
).getOrElse(tmpExecutorList)
})
}

private def toYarnLogServerUrl(logServerUrl: String, nmPort: Int)(nmLogUrl: String): String = {
val containerSuffixPos = nmLogUrl.indexOf("container_")
if (containerSuffixPos >= 0) {
val nodeId = URI.create(nmLogUrl).getHost + ":" + nmPort
val containerSuffix = nmLogUrl.substring(containerSuffixPos)
val containerEndPos = containerSuffix.indexOf("/")
if (containerEndPos >= 0) {
val container = containerSuffix.substring(0, containerEndPos)
s"$logServerUrl/$nodeId/$container/$containerSuffix"
} else {
nmLogUrl
}
} else {
nmLogUrl
}
}

private def withYarnLogServerLogs(
logRewrite: String => String)(
info: ExecutorSummary): ExecutorSummary = {
new ExecutorSummary(
id = info.id,
hostPort = info.hostPort,
isActive = info.isActive,
rddBlocks = info.rddBlocks,
memoryUsed = info.memoryUsed,
diskUsed = info.diskUsed,
totalCores = info.totalCores,
maxTasks = info.maxTasks,
activeTasks = info.activeTasks,
failedTasks = info.failedTasks,
completedTasks = info.completedTasks,
totalTasks = info.totalTasks,
totalDuration = info.totalDuration,
totalGCTime = info.totalGCTime,
totalInputBytes = info.totalInputBytes,
totalShuffleRead = info.totalShuffleRead,
totalShuffleWrite = info.totalShuffleWrite,
isBlacklisted = info.isBlacklisted,
maxMemory = info.maxMemory,
addTime = info.addTime,
removeTime = info.removeTime,
removeReason = info.removeReason,
executorLogs = info.executorLogs.mapValues(logRewrite),
memoryMetrics = info.memoryMetrics
)
}
}

private[v1] class OneApplicationResource extends AbstractApplicationResource {
Expand Down
24 changes: 20 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.ui

import java.util.{Date, List => JList, ServiceLoader}
import java.util.Date

import scala.collection.JavaConverters._
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.{JobExecutionStatus, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.status.AppStatusStore
Expand All @@ -31,7 +32,6 @@ import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.ExecutorsTab
import org.apache.spark.ui.jobs.{JobsTab, StagesTab}
import org.apache.spark.ui.storage.StorageTab
import org.apache.spark.util.Utils

/**
* Top level user interface for a Spark application.
Expand All @@ -52,6 +52,22 @@ private[spark] class SparkUI private (

val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)

private val yarnConf = SparkHadoopUtil.get.newConfiguration(conf)
private val portReg = "^.*:([0-9]+)$".r
private[spark] val nmRpcPort = Option(yarnConf.get(YarnConfiguration.NM_ADDRESS))
.map { case portReg(port) => port.toInt }
.getOrElse(0)

private val isHistoryUI = sc.isEmpty
private val useAggregatedLogs = isHistoryUI && nmRpcPort > 0 &&
yarnConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false)

private[spark] val yarnLogServerUrl = if (useAggregatedLogs) {
Option(yarnConf.get(YarnConfiguration.YARN_LOG_SERVER_URL))
} else {
None
}

var appId: String = _

private var streamingJobProgressListener: Option[SparkListener] = None
Expand Down