diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java index 731f6fc767dfd..579e7ff320f5c 100644 --- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java +++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java @@ -162,6 +162,11 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe onEvent(speculativeTask); } + @Override + public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) { + onEvent(event); + } + @Override public void onOtherEvent(SparkListenerEvent event) { onEvent(event); diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html index 0b26bfc5b2d82..0729dfe1cef72 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage-template.html @@ -89,6 +89,7 @@

Executors

Disk Used Cores Resources + Resource Profile Id Active Tasks Failed Tasks Complete Tasks diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js index ec57797ba0909..520edb9cc3e34 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js @@ -119,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) { } var sumOptionalColumns = [3, 4]; -var execOptionalColumns = [5, 6, 9]; +var execOptionalColumns = [5, 6, 9, 10]; var execDataTable; var sumDataTable; @@ -415,6 +415,7 @@ $(document).ready(function () { {data: 'diskUsed', render: formatBytes}, {data: 'totalCores'}, {name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false}, + {name: 'resourceProfileIdCol', data: 'resourceProfileId'}, { data: 'activeTasks', "fnCreatedCell": function (nTd, sData, oData, iRow, iCol) { @@ -461,7 +462,8 @@ $(document).ready(function () { "columnDefs": [ {"visible": false, "targets": 5}, {"visible": false, "targets": 6}, - {"visible": false, "targets": 9} + {"visible": false, "targets": 9}, + {"visible": false, "targets": 10} ], "deferRender": true }; @@ -570,6 +572,7 @@ $(document).ready(function () { "
On Heap Memory
" + "
Off Heap Memory
" + "
Resources
" + + "
Resource Profile Id
" + ""); reselectCheckboxesBasedOnTaskTableState(); diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5c92527b7b80e..38d7319b1f0ef 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -435,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging { } _listenerBus = new LiveListenerBus(_conf) - _resourceProfileManager = new ResourceProfileManager(_conf) + _resourceProfileManager = new ResourceProfileManager(_conf, _listenerBus) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala index 741050027fc6b..7973652b3e254 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryAppStatusStore.scala @@ -72,7 +72,8 @@ private[spark] class HistoryAppStatusStore( source.totalGCTime, source.totalInputBytes, source.totalShuffleRead, source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime, source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics, - source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources) + source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources, + source.resourceProfileId) } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index c3e244474a692..f365548c75359 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -25,17 +25,19 @@ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.annotation.Evolving import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Tests._ +import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerResourceProfileAdded} import org.apache.spark.util.Utils import org.apache.spark.util.Utils.isTesting /** * Manager of resource profiles. The manager allows one place to keep the actual ResourceProfiles * and everywhere else we can use the ResourceProfile Id to save on space. - * Note we never remove a resource profile at this point. Its expected this number if small + * Note we never remove a resource profile at this point. Its expected this number is small * so this shouldn't be much overhead. */ @Evolving -private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging { +private[spark] class ResourceProfileManager(sparkConf: SparkConf, + listenerBus: LiveListenerBus) extends Logging { private val resourceProfileIdToResourceProfile = new HashMap[Int, ResourceProfile]() private val (readLock, writeLock) = { @@ -83,6 +85,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin // force the computation of maxTasks and limitingResource now so we don't have cost later rp.limitingResource(sparkConf) logInfo(s"Added ResourceProfile id: ${rp.id}") + listenerBus.post(SparkListenerResourceProfileAdded(rp)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 24e2a5e4d4a62..b2e9a0b2a04e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -235,6 +235,10 @@ private[spark] class EventLoggingListener( } } + override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = { + logEvent(event, flushLogger = true) + } + override def onOtherEvent(event: SparkListenerEvent): Unit = { if (event.logEvent) { logEvent(event, flushLogger = true) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index c150b0341500c..62d54f3b74a47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo import org.apache.spark.TaskEndReason import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} +import org.apache.spark.resource.ResourceProfile import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo} @@ -207,6 +208,10 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent @DeveloperApi case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent +@DeveloperApi +case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile) + extends SparkListenerEvent + /** * Interface for listening to events from the Spark scheduler. Most applications should probably * extend SparkListener or SparkFirehoseListener directly, rather than implementing this class. @@ -348,6 +353,11 @@ private[spark] trait SparkListenerInterface { * Called when other events like SQL-specific events are posted. */ def onOtherEvent(event: SparkListenerEvent): Unit + + /** + * Called when a Resource Profile is added to the manager. + */ + def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit } @@ -421,4 +431,6 @@ abstract class SparkListener extends SparkListenerInterface { speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { } override def onOtherEvent(event: SparkListenerEvent): Unit = { } + + override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = { } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala index 8f6b7ad309602..3d316c948db7e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListenerBus.scala @@ -79,6 +79,8 @@ private[spark] trait SparkListenerBus listener.onBlockUpdated(blockUpdated) case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted => listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted) + case resourceProfileAdded: SparkListenerResourceProfileAdded => + listener.onResourceProfileAdded(resourceProfileAdded) case _ => listener.onOtherEvent(event) } } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala index c3f22f32993a8..f7b0e9b62fc29 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala @@ -28,6 +28,7 @@ import org.apache.spark.executor.TaskMetrics import org.apache.spark.internal.Logging import org.apache.spark.internal.config.CPUS_PER_TASK import org.apache.spark.internal.config.Status._ +import org.apache.spark.resource.ResourceProfile.CPUS import org.apache.spark.scheduler._ import org.apache.spark.status.api.v1 import org.apache.spark.storage._ @@ -51,7 +52,7 @@ private[spark] class AppStatusListener( private var sparkVersion = SPARK_VERSION private var appInfo: v1.ApplicationInfo = null private var appSummary = new AppSummary(0, 0) - private var coresPerTask: Int = 1 + private var defaultCpusPerTask: Int = 1 // How often to update live entities. -1 means "never update" when replaying applications, // meaning only the last write will happen. For live applications, this avoids a few @@ -76,6 +77,7 @@ private[spark] class AppStatusListener( private val liveTasks = new HashMap[Long, LiveTask]() private val liveRDDs = new HashMap[Int, LiveRDD]() private val pools = new HashMap[String, SchedulerPool]() + private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]() private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" // Keep the active executor count as a separate variable to avoid having to do synchronization @@ -145,6 +147,20 @@ private[spark] class AppStatusListener( } } + override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = { + val maxTasks = if (event.resourceProfile.isCoresLimitKnown) { + Some(event.resourceProfile.maxTasksPerExecutor(conf)) + } else { + None + } + val liveRP = new LiveResourceProfile(event.resourceProfile.id, + event.resourceProfile.executorResources, event.resourceProfile.taskResources, maxTasks) + liveResourceProfiles(event.resourceProfile.id) = liveRP + val rpInfo = new v1.ResourceProfileInfo(liveRP.resourceProfileId, + liveRP.executorResources, liveRP.taskResources) + kvstore.write(new ResourceProfileWrapper(rpInfo)) + } + override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = { val details = event.environmentDetails @@ -159,10 +175,11 @@ private[spark] class AppStatusListener( details.getOrElse("Spark Properties", Nil), details.getOrElse("Hadoop Properties", Nil), details.getOrElse("System Properties", Nil), - details.getOrElse("Classpath Entries", Nil)) + details.getOrElse("Classpath Entries", Nil), + Nil) - coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt) - .getOrElse(coresPerTask) + defaultCpusPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt) + .getOrElse(defaultCpusPerTask) kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo)) } @@ -197,10 +214,16 @@ private[spark] class AppStatusListener( exec.host = event.executorInfo.executorHost exec.isActive = true exec.totalCores = event.executorInfo.totalCores - exec.maxTasks = event.executorInfo.totalCores / coresPerTask + val rpId = event.executorInfo.resourceProfileId + val liveRP = liveResourceProfiles.get(rpId) + val cpusPerTask = liveRP.flatMap(_.taskResources.get(CPUS)) + .map(_.amount.toInt).getOrElse(defaultCpusPerTask) + val maxTasksPerExec = liveRP.flatMap(_.maxTasksPerExecutor) + exec.maxTasks = maxTasksPerExec.getOrElse(event.executorInfo.totalCores / cpusPerTask) exec.executorLogs = event.executorInfo.logUrlMap exec.resources = event.executorInfo.resourcesInfo exec.attributes = event.executorInfo.attributes + exec.resourceProfileId = rpId liveUpdate(exec, System.nanoTime()) } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 42d4071d61c8b..ea033d0c890ac 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf, SparkException} +import org.apache.spark.resource.ResourceProfileManager import org.apache.spark.status.api.v1 import org.apache.spark.ui.scope._ import org.apache.spark.util.Utils @@ -51,6 +52,10 @@ private[spark] class AppStatusStore( store.read(klass, klass.getName()).info } + def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = { + store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq + } + def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = { val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info) if (statuses != null && !statuses.isEmpty()) { @@ -486,7 +491,8 @@ private[spark] class AppStatusStore( accumulatorUpdates = stage.accumulatorUpdates, tasks = Some(tasks), executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)), - killedTasksSummary = stage.killedTasksSummary) + killedTasksSummary = stage.killedTasksSummary, + resourceProfileId = stage.resourceProfileId) } def rdd(rddId: Int): v1.RDDStorageInfo = { diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 2714f30de14f0..86cb4fe138773 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -28,7 +28,7 @@ import com.google.common.collect.Interners import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest} import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo} import org.apache.spark.status.api.v1 import org.apache.spark.storage.{RDDInfo, StorageLevel} @@ -245,6 +245,21 @@ private class LiveTask( } +private class LiveResourceProfile( + val resourceProfileId: Int, + val executorResources: Map[String, ExecutorResourceRequest], + val taskResources: Map[String, TaskResourceRequest], + val maxTasksPerExecutor: Option[Int]) extends LiveEntity { + + def toApi(): v1.ResourceProfileInfo = { + new v1.ResourceProfileInfo(resourceProfileId, executorResources, taskResources) + } + + override protected def doUpdate(): Any = { + new ResourceProfileWrapper(toApi()) + } +} + private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { var hostPort: String = null @@ -285,6 +300,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend var usedOnHeap = 0L var usedOffHeap = 0L + var resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID + def hasMemoryInfo: Boolean = totalOnHeap >= 0L // peak values for executor level metrics @@ -327,7 +344,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend blacklistedInStages, Some(peakExecutorMetrics).filter(_.isSet), attributes, - resources) + resources, + resourceProfileId) new ExecutorSummaryWrapper(info) } } @@ -465,7 +483,8 @@ private class LiveStage extends LiveEntity { accumulatorUpdates = newAccumulatorInfos(info.accumulables.values), tasks = None, executorSummary = None, - killedTasksSummary = killedSummary) + killedTasksSummary = killedSummary, + resourceProfileId = info.resourceProfileId) } override protected def doUpdate(): Any = { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala index cf5c759bebdbb..e0c85fdf6fb5d 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneApplicationResource.scala @@ -101,12 +101,14 @@ private[v1] class AbstractApplicationResource extends BaseAppResource { @Path("environment") def environmentInfo(): ApplicationEnvironmentInfo = withUI { ui => val envInfo = ui.store.environmentInfo() + val resourceProfileInfo = ui.store.resourceProfileInfo() new v1.ApplicationEnvironmentInfo( envInfo.runtime, Utils.redact(ui.conf, envInfo.sparkProperties), Utils.redact(ui.conf, envInfo.hadoopProperties), Utils.redact(ui.conf, envInfo.systemProperties), - envInfo.classpathEntries) + envInfo.classpathEntries, + resourceProfileInfo) } @GET diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index 5ec9b36393764..e89e29101a126 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -30,7 +30,7 @@ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType -import org.apache.spark.resource.ResourceInformation +import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest} case class ApplicationInfo private[spark]( id: String, @@ -62,6 +62,11 @@ case class ApplicationAttemptInfo private[spark]( } +class ResourceProfileInfo private[spark]( + val id: Int, + val executorResources: Map[String, ExecutorResourceRequest], + val taskResources: Map[String, TaskResourceRequest]) + class ExecutorStageSummary private[spark]( val taskTime : Long, val failedTasks : Int, @@ -109,7 +114,8 @@ class ExecutorSummary private[spark]( @JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer]) val peakMemoryMetrics: Option[ExecutorMetrics], val attributes: Map[String, String], - val resources: Map[String, ResourceInformation]) + val resources: Map[String, ResourceInformation], + val resourceProfileId: Int) class MemoryMetrics private[spark]( val usedOnHeapStorageMemory: Long, @@ -252,7 +258,8 @@ class StageData private[spark]( val accumulatorUpdates: Seq[AccumulableInfo], val tasks: Option[Map[Long, TaskData]], val executorSummary: Option[Map[String, ExecutorStageSummary]], - val killedTasksSummary: Map[String, Int]) + val killedTasksSummary: Map[String, Int], + val resourceProfileId: Int) class TaskData private[spark]( val taskId: Long, @@ -365,12 +372,15 @@ class AccumulableInfo private[spark]( class VersionInfo private[spark]( val spark: String) +// Note the resourceProfiles information are only added here on return from the +// REST call, they are not stored with it. class ApplicationEnvironmentInfo private[spark] ( val runtime: RuntimeInfo, val sparkProperties: Seq[(String, String)], val hadoopProperties: Seq[(String, String)], val systemProperties: Seq[(String, String)], - val classpathEntries: Seq[(String, String)]) + val classpathEntries: Seq[(String, String)], + val resourceProfiles: Seq[ResourceProfileInfo]) class RuntimeInfo private[spark]( val javaVersion: String, diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index c957ff75a501f..b40f7304b7ce2 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -374,6 +374,13 @@ private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) { } +private[spark] class ResourceProfileWrapper(val rpInfo: ResourceProfileInfo) { + + @JsonIgnore @KVIndex + def id: Int = rpInfo.id + +} + private[spark] class ExecutorStageSummaryWrapper( val stageId: Int, val stageAttemptId: Int, diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index c6eb461ad601c..2f5b73118927b 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -19,9 +19,11 @@ package org.apache.spark.ui.env import javax.servlet.http.HttpServletRequest +import scala.collection.mutable.StringBuilder import scala.xml.Node import org.apache.spark.SparkConf +import org.apache.spark.resource.{ExecutorResourceRequest, TaskResourceRequest} import org.apache.spark.status.AppStatusStore import org.apache.spark.ui._ import org.apache.spark.util.Utils @@ -38,6 +40,37 @@ private[ui] class EnvironmentPage( "Java Home" -> appEnv.runtime.javaHome, "Scala Version" -> appEnv.runtime.scalaVersion) + def constructExecutorRequestString(execReqs: Map[String, ExecutorResourceRequest]): String = { + execReqs.map { + case (_, execReq) => + val execStr = new StringBuilder(s"\t${execReq.resourceName}: [amount: ${execReq.amount}") + if (execReq.discoveryScript.nonEmpty) { + execStr ++= s", discovery: ${execReq.discoveryScript}" + } + if (execReq.vendor.nonEmpty) { + execStr ++= s", vendor: ${execReq.vendor}" + } + execStr ++= "]" + execStr.toString() + }.mkString("\n") + } + + def constructTaskRequestString(taskReqs: Map[String, TaskResourceRequest]): String = { + taskReqs.map { + case (_, taskReq) => s"\t${taskReq.resourceName}: [amount: ${taskReq.amount}]" + }.mkString("\n") + } + + val resourceProfileInfo = store.resourceProfileInfo().map { rinfo => + val einfo = constructExecutorRequestString(rinfo.executorResources) + val tinfo = constructTaskRequestString(rinfo.taskResources) + val res = s"Executor Reqs:\n$einfo\nTask Reqs:\n$tinfo" + (rinfo.id.toString, res) + }.toMap + + val resourceProfileInformationTable = UIUtils.listingTable(resourceProfileHeader, + jvmRowDataPre, resourceProfileInfo.toSeq.sortWith(_._1.toInt < _._1.toInt), + fixedWidth = true, headerClasses = headerClassesNoSortValues) val runtimeInformationTable = UIUtils.listingTable( propertyHeader, jvmRow, jvmInformation.toSeq.sorted, fixedWidth = true, headerClasses = headerClasses) @@ -77,6 +110,17 @@ private[ui] class EnvironmentPage(
{sparkPropertiesTable}
+ +

+ + Resource Profiles +

+
+
+ {resourceProfileInformationTable} +
@@ -115,10 +159,14 @@ private[ui] class EnvironmentPage( UIUtils.headerSparkPage(request, "Environment", content, parent) } + private def resourceProfileHeader = Seq("Resource Profile Id", "Resource Profile Contents") private def propertyHeader = Seq("Name", "Value") private def classPathHeader = Seq("Resource", "Source") private def headerClasses = Seq("sorttable_alpha", "sorttable_alpha") + private def headerClassesNoSortValues = Seq("sorttable_numeric", "sorttable_nosort") + private def jvmRowDataPre(kv: (String, String)) = + {kv._1}
{kv._2}
private def jvmRow(kv: (String, String)) = {kv._1}{kv._2} private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} private def classPathRow(data: (String, String)) = {data._1}{data._2} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala index 9be7124adcf7b..542dc39eee4f0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala @@ -26,6 +26,7 @@ import scala.xml.{Node, NodeSeq, Unparsed, Utility} import org.apache.commons.text.StringEscapeUtils import org.apache.spark.JobExecutionStatus +import org.apache.spark.resource.ResourceProfile import org.apache.spark.status.AppStatusStore import org.apache.spark.status.api.v1 import org.apache.spark.ui._ @@ -253,7 +254,8 @@ private[ui] class JobPage(parent: JobsTab, store: AppStatusStore) extends WebUIP accumulatorUpdates = Nil, tasks = None, executorSummary = None, - killedTasksSummary = Map()) + killedTasksSummary = Map(), + ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 7973d30493a5a..1b072274541c8 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -142,6 +142,10 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We val summary =