Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ public void onSpeculativeTaskSubmitted(SparkListenerSpeculativeTaskSubmitted spe
onEvent(speculativeTask);
}

@Override
public void onResourceProfileAdded(SparkListenerResourceProfileAdded event) {
onEvent(event);
}

@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ <h4 class="title-table">Executors</h4>
<th>Disk Used</th>
<th>Cores</th>
<th>Resources</th>
<th>Resource Profile Id</th>
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks currently executing. Darker shading highlights executors with more active tasks.">Active Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Number of tasks that have failed on this executor. Darker shading highlights executors with a high proportion of failed tasks.">Failed Tasks</span></th>
<th>Complete Tasks</th>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
}

var sumOptionalColumns = [3, 4];
var execOptionalColumns = [5, 6, 9];
var execOptionalColumns = [5, 6, 9, 10];
var execDataTable;
var sumDataTable;

Expand Down Expand Up @@ -415,6 +415,7 @@ $(document).ready(function () {
{data: 'diskUsed', render: formatBytes},
{data: 'totalCores'},
{name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false},
{name: 'resourceProfileIdCol', data: 'resourceProfileId'},
{
data: 'activeTasks',
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
Expand Down Expand Up @@ -461,7 +462,8 @@ $(document).ready(function () {
"columnDefs": [
{"visible": false, "targets": 5},
{"visible": false, "targets": 6},
{"visible": false, "targets": 9}
{"visible": false, "targets": 9},
{"visible": false, "targets": 10}
],
"deferRender": true
};
Expand Down Expand Up @@ -570,6 +572,7 @@ $(document).ready(function () {
"<div id='on_heap_memory' class='on-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='3' data-exec-col-idx='5'>On Heap Memory</div>" +
"<div id='off_heap_memory' class='off-heap-memory-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='4' data-exec-col-idx='6'>Off Heap Memory</div>" +
"<div id='extra_resources' class='resources-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='9'>Resources</div>" +
"<div id='resource_prof_id' class='resource-prof-id-checkbox-div'><input type='checkbox' class='toggle-vis' data-sum-col-idx='' data-exec-col-idx='10'>Resource Profile Id</div>" +
"</div>");

reselectCheckboxesBasedOnTaskTableState();
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

_listenerBus = new LiveListenerBus(_conf)
_resourceProfileManager = new ResourceProfileManager(_conf)
_resourceProfileManager = new ResourceProfileManager(_conf, _listenerBus)

// Initialize the app status store and listener before SparkEnv is created so that it gets
// all events.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ private[spark] class HistoryAppStatusStore(
source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources)
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources,
source.resourceProfileId)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,19 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.Evolving
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Tests._
import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerResourceProfileAdded}
import org.apache.spark.util.Utils
import org.apache.spark.util.Utils.isTesting

/**
* Manager of resource profiles. The manager allows one place to keep the actual ResourceProfiles
* and everywhere else we can use the ResourceProfile Id to save on space.
* Note we never remove a resource profile at this point. Its expected this number if small
* Note we never remove a resource profile at this point. Its expected this number is small
* so this shouldn't be much overhead.
*/
@Evolving
private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Logging {
private[spark] class ResourceProfileManager(sparkConf: SparkConf,
listenerBus: LiveListenerBus) extends Logging {
private val resourceProfileIdToResourceProfile = new HashMap[Int, ResourceProfile]()

private val (readLock, writeLock) = {
Expand Down Expand Up @@ -83,6 +85,7 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf) extends Loggin
// force the computation of maxTasks and limitingResource now so we don't have cost later
rp.limitingResource(sparkConf)
logInfo(s"Added ResourceProfile id: ${rp.id}")
listenerBus.post(SparkListenerResourceProfileAdded(rp))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,10 @@ private[spark] class EventLoggingListener(
}
}

override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
logEvent(event, flushLogger = true)
}

override def onOtherEvent(event: SparkListenerEvent): Unit = {
if (event.logEvent) {
logEvent(event, flushLogger = true)
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo
import org.apache.spark.TaskEndReason
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}

Expand Down Expand Up @@ -207,6 +208,10 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
@DeveloperApi
case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerResourceProfileAdded(resourceProfile: ResourceProfile)
extends SparkListenerEvent

/**
* Interface for listening to events from the Spark scheduler. Most applications should probably
* extend SparkListener or SparkFirehoseListener directly, rather than implementing this class.
Expand Down Expand Up @@ -348,6 +353,11 @@ private[spark] trait SparkListenerInterface {
* Called when other events like SQL-specific events are posted.
*/
def onOtherEvent(event: SparkListenerEvent): Unit

/**
* Called when a Resource Profile is added to the manager.
*/
def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit
}


Expand Down Expand Up @@ -421,4 +431,6 @@ abstract class SparkListener extends SparkListenerInterface {
speculativeTask: SparkListenerSpeculativeTaskSubmitted): Unit = { }

override def onOtherEvent(event: SparkListenerEvent): Unit = { }

override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ private[spark] trait SparkListenerBus
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case resourceProfileAdded: SparkListenerResourceProfileAdded =>
listener.onResourceProfileAdded(resourceProfileAdded)
case _ => listener.onOtherEvent(event)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.CPUS_PER_TASK
import org.apache.spark.internal.config.Status._
import org.apache.spark.resource.ResourceProfile.CPUS
import org.apache.spark.scheduler._
import org.apache.spark.status.api.v1
import org.apache.spark.storage._
Expand All @@ -51,7 +52,7 @@ private[spark] class AppStatusListener(
private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1
private var defaultCpusPerTask: Int = 1

// How often to update live entities. -1 means "never update" when replaying applications,
// meaning only the last write will happen. For live applications, this avoids a few
Expand All @@ -76,6 +77,7 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
private val liveResourceProfiles = new HashMap[Int, LiveResourceProfile]()

private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
// Keep the active executor count as a separate variable to avoid having to do synchronization
Expand Down Expand Up @@ -145,6 +147,20 @@ private[spark] class AppStatusListener(
}
}

override def onResourceProfileAdded(event: SparkListenerResourceProfileAdded): Unit = {
val maxTasks = if (event.resourceProfile.isCoresLimitKnown) {
Some(event.resourceProfile.maxTasksPerExecutor(conf))
} else {
None
}
val liveRP = new LiveResourceProfile(event.resourceProfile.id,
event.resourceProfile.executorResources, event.resourceProfile.taskResources, maxTasks)
liveResourceProfiles(event.resourceProfile.id) = liveRP
val rpInfo = new v1.ResourceProfileInfo(liveRP.resourceProfileId,
liveRP.executorResources, liveRP.taskResources)
kvstore.write(new ResourceProfileWrapper(rpInfo))
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
val details = event.environmentDetails

Expand All @@ -159,10 +175,11 @@ private[spark] class AppStatusListener(
details.getOrElse("Spark Properties", Nil),
details.getOrElse("Hadoop Properties", Nil),
details.getOrElse("System Properties", Nil),
details.getOrElse("Classpath Entries", Nil))
details.getOrElse("Classpath Entries", Nil),
Nil)

coresPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
.getOrElse(coresPerTask)
defaultCpusPerTask = envInfo.sparkProperties.toMap.get(CPUS_PER_TASK.key).map(_.toInt)
.getOrElse(defaultCpusPerTask)

kvstore.write(new ApplicationEnvironmentInfoWrapper(envInfo))
}
Expand Down Expand Up @@ -197,10 +214,16 @@ private[spark] class AppStatusListener(
exec.host = event.executorInfo.executorHost
exec.isActive = true
exec.totalCores = event.executorInfo.totalCores
exec.maxTasks = event.executorInfo.totalCores / coresPerTask
val rpId = event.executorInfo.resourceProfileId
val liveRP = liveResourceProfiles.get(rpId)
val cpusPerTask = liveRP.flatMap(_.taskResources.get(CPUS))
.map(_.amount.toInt).getOrElse(defaultCpusPerTask)
val maxTasksPerExec = liveRP.flatMap(_.maxTasksPerExecutor)
exec.maxTasks = maxTasksPerExec.getOrElse(event.executorInfo.totalCores / cpusPerTask)
exec.executorLogs = event.executorInfo.logUrlMap
exec.resources = event.executorInfo.resourcesInfo
exec.attributes = event.executorInfo.attributes
exec.resourceProfileId = rpId
liveUpdate(exec, System.nanoTime())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{JobExecutionStatus, SparkConf, SparkException}
import org.apache.spark.resource.ResourceProfileManager
import org.apache.spark.status.api.v1
import org.apache.spark.ui.scope._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -51,6 +52,10 @@ private[spark] class AppStatusStore(
store.read(klass, klass.getName()).info
}

def resourceProfileInfo(): Seq[v1.ResourceProfileInfo] = {
store.view(classOf[ResourceProfileWrapper]).asScala.map(_.rpInfo).toSeq
}

def jobsList(statuses: JList[JobExecutionStatus]): Seq[v1.JobData] = {
val it = store.view(classOf[JobDataWrapper]).reverse().asScala.map(_.info)
if (statuses != null && !statuses.isEmpty()) {
Expand Down Expand Up @@ -486,7 +491,8 @@ private[spark] class AppStatusStore(
accumulatorUpdates = stage.accumulatorUpdates,
tasks = Some(tasks),
executorSummary = Some(executorSummary(stage.stageId, stage.attemptId)),
killedTasksSummary = stage.killedTasksSummary)
killedTasksSummary = stage.killedTasksSummary,
resourceProfileId = stage.resourceProfileId)
}

def rdd(rddId: Int): v1.RDDStorageInfo = {
Expand Down
25 changes: 22 additions & 3 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.google.common.collect.Interners

import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, ResourceProfile, TaskResourceRequest}
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.{RDDInfo, StorageLevel}
Expand Down Expand Up @@ -245,6 +245,21 @@ private class LiveTask(

}

private class LiveResourceProfile(
val resourceProfileId: Int,
val executorResources: Map[String, ExecutorResourceRequest],
val taskResources: Map[String, TaskResourceRequest],
val maxTasksPerExecutor: Option[Int]) extends LiveEntity {

def toApi(): v1.ResourceProfileInfo = {
new v1.ResourceProfileInfo(resourceProfileId, executorResources, taskResources)
}

override protected def doUpdate(): Any = {
new ResourceProfileWrapper(toApi())
}
}

private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {

var hostPort: String = null
Expand Down Expand Up @@ -285,6 +300,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
var usedOnHeap = 0L
var usedOffHeap = 0L

var resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID

def hasMemoryInfo: Boolean = totalOnHeap >= 0L

// peak values for executor level metrics
Expand Down Expand Up @@ -327,7 +344,8 @@ private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extend
blacklistedInStages,
Some(peakExecutorMetrics).filter(_.isSet),
attributes,
resources)
resources,
resourceProfileId)
new ExecutorSummaryWrapper(info)
}
}
Expand Down Expand Up @@ -465,7 +483,8 @@ private class LiveStage extends LiveEntity {
accumulatorUpdates = newAccumulatorInfos(info.accumulables.values),
tasks = None,
executorSummary = None,
killedTasksSummary = killedSummary)
killedTasksSummary = killedSummary,
resourceProfileId = info.resourceProfileId)
}

override protected def doUpdate(): Any = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,14 @@ private[v1] class AbstractApplicationResource extends BaseAppResource {
@Path("environment")
def environmentInfo(): ApplicationEnvironmentInfo = withUI { ui =>
val envInfo = ui.store.environmentInfo()
val resourceProfileInfo = ui.store.resourceProfileInfo()
new v1.ApplicationEnvironmentInfo(
envInfo.runtime,
Utils.redact(ui.conf, envInfo.sparkProperties),
Utils.redact(ui.conf, envInfo.hadoopProperties),
Utils.redact(ui.conf, envInfo.systemProperties),
envInfo.classpathEntries)
envInfo.classpathEntries,
resourceProfileInfo)
}

@GET
Expand Down
Loading