Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ <h4 class="title-table">Executors</h4>
Off Heap Storage Memory</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Disk Used">Disk Used</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Cores">Cores</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Resources">Resources</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Active Tasks">Active Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Failed Tasks">Failed Tasks</span></th>
<th><span data-toggle="tooltip" data-placement="top" title="Complete Tasks">Complete Tasks</span></th>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ function formatStatus(status, type, row) {
return "Dead"
}

function formatResourceCells(resources) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems a little crowded from the screenshot. It would be better if we could split this single resource cell into several row cells by resource type. But I guess this may require more changes ? Maybe, add \n after each resource type would be good enough ? Or you're intentionally to do this to occupy less space of the cell(since it brings more unnecessary space for other columns(looks more sparse now) ) ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to take up less space. the linked PR adds in checkboxes to show columns so we could break up more if its not on by default, but I think at this point we should start with something small and then enhance later as people figure out what is useful.

var result = ""
var count = 0
$.each(resources, function (name, resInfo) {
if (count > 0) {
result += ", "
}
result += name + ': [' + resInfo.addresses.join(", ") + ']'
count += 1
});
return result
}

jQuery.extend(jQuery.fn.dataTableExt.oSort, {
"title-numeric-pre": function (a) {
var x = a.match(/title="*(-?[0-9\.]+)/)[1];
Expand All @@ -65,6 +78,12 @@ function logsExist(execs) {
});
}

function resourcesExist(execs) {
return execs.some(function(exec) {
return !($.isEmptyObject(exec["resources"]));
});
}

// Determine Color Opacity from 0.5-1
// activeTasks range from 0 to maxTasks
function activeTasksAlpha(activeTasks, maxTasks) {
Expand Down Expand Up @@ -386,6 +405,7 @@ $(document).ready(function () {
},
{data: 'diskUsed', render: formatBytes},
{data: 'totalCores'},
{name: 'resourcesCol', data: 'resources', render: formatResourceCells, orderable: false},
{
data: 'activeTasks',
"fnCreatedCell": function (nTd, sData, oData, iRow, iCol) {
Expand Down Expand Up @@ -434,6 +454,7 @@ $(document).ready(function () {
var dt = $(selector).DataTable(conf);
dt.column('executorLogsCol:name').visible(logsExist(response));
dt.column('threadDumpCol:name').visible(getThreadDumpEnabled());
dt.column('resourcesCol:name').visible(resourcesExist(response));
$('#active-executors [data-toggle="tooltip"]').tooltip();

var sumSelector = "#summary-execs-table";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class HistoryAppStatusStore(
source.totalGCTime, source.totalInputBytes, source.totalShuffleRead,
source.totalShuffleWrite, source.isBlacklisted, source.maxMemory, source.addTime,
source.removeTime, source.removeReason, newExecutorLogs, source.memoryMetrics,
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes)
source.blacklistedInStages, source.peakMemoryMetrics, source.attributes, source.resources)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ class ResourceInformation(
}

override def hashCode(): Int = Seq(name, addresses.toSeq).hashCode()

def toJson(): JValue = ResourceInformationJson(name, addresses).toJValue
}

private[spark] object ResourceInformation {
Expand All @@ -72,6 +74,16 @@ private[spark] object ResourceInformation {
s"Here is a correct example: $exampleJson.", e)
}
}

def parseJson(json: JValue): ResourceInformation = {
implicit val formats = DefaultFormats
try {
json.extract[ResourceInformationJson].toResourceInformation
} catch {
case NonFatal(e) =>
throw new SparkException(s"Error parsing JSON into ResourceInformation:\n$json\n", e)
}
}
}

/** A case class to simplify JSON serialization of [[ResourceInformation]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import scala.collection.mutable

import org.apache.spark.SparkException
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.util.collection.OpenHashMap

/**
Expand All @@ -31,8 +32,8 @@ import org.apache.spark.util.collection.OpenHashMap
* @param addresses Resource addresses provided by the executor
*/
private[spark] class ExecutorResourceInfo(
val name: String,
addresses: Seq[String]) extends Serializable {
name: String,
addresses: Seq[String]) extends ResourceInformation(name, addresses.toArray) with Serializable {

/**
* Map from an address to its availability, the value `true` means the address is available,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ import org.apache.spark.scheduler.ExecutorResourceInfo
* @param resourcesInfo The information of the currently available resources on the executor
*/
private[cluster] class ExecutorData(
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val attributes: Map[String, String],
val resourcesInfo: Map[String, ExecutorResourceInfo]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes)
val executorEndpoint: RpcEndpointRef,
val executorAddress: RpcAddress,
override val executorHost: String,
var freeCores: Int,
override val totalCores: Int,
override val logUrlMap: Map[String, String],
override val attributes: Map[String, String],
override val resourcesInfo: Map[String, ExecutorResourceInfo]
) extends ExecutorInfo(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,30 @@
package org.apache.spark.scheduler.cluster

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.resource.ResourceInformation

/**
* :: DeveloperApi ::
* Stores information about an executor to pass from the scheduler to SparkListeners.
*/
@DeveloperApi
class ExecutorInfo(
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String],
val attributes: Map[String, String]) {
val executorHost: String,
val totalCores: Int,
val logUrlMap: Map[String, String],
val attributes: Map[String, String],
val resourcesInfo: Map[String, ResourceInformation]) {

def this(executorHost: String, totalCores: Int, logUrlMap: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, Map.empty)
this(executorHost, totalCores, logUrlMap, Map.empty, Map.empty)
}

def this(
executorHost: String,
totalCores: Int,
logUrlMap: Map[String, String],
attributes: Map[String, String]) = {
this(executorHost, totalCores, logUrlMap, attributes, Map.empty)
}

def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
Expand All @@ -41,12 +51,13 @@ class ExecutorInfo(
executorHost == that.executorHost &&
totalCores == that.totalCores &&
logUrlMap == that.logUrlMap &&
attributes == that.attributes
attributes == that.attributes &&
resourcesInfo == that.resourcesInfo
case _ => false
}

override def hashCode(): Int = {
val state = Seq(executorHost, totalCores, logUrlMap, attributes)
val state = Seq(executorHost, totalCores, logUrlMap, attributes, resourcesInfo)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private[spark] class LocalSchedulerBackend(
System.currentTimeMillis,
executorEndpoint.localExecutorId,
new ExecutorInfo(executorEndpoint.localExecutorHostname, totalCores, Map.empty,
Map.empty)))
Map.empty, Map.empty)))
launcherBackend.setAppId(appId)
launcherBackend.setState(SparkAppHandle.State.RUNNING)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ private[spark] class AppStatusListener(
exec.totalCores = event.executorInfo.totalCores
exec.maxTasks = event.executorInfo.totalCores / coresPerTask
exec.executorLogs = event.executorInfo.logUrlMap
exec.resources = event.executorInfo.resourcesInfo
exec.attributes = event.executorInfo.attributes
liveUpdate(exec, System.nanoTime())
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.google.common.collect.Interners

import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
import org.apache.spark.status.api.v1
import org.apache.spark.storage.RDDInfo
Expand Down Expand Up @@ -259,6 +260,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE

var executorLogs = Map[String, String]()
var attributes = Map[String, String]()
var resources = Map[String, ResourceInformation]()

// Memory metrics. They may not be recorded (e.g. old event logs) so if totalOnHeap is not
// initialized, the store will not contain this information.
Expand Down Expand Up @@ -308,7 +310,8 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
memoryMetrics,
blacklistedInStages,
Some(peakExecutorMetrics).filter(_.isSet),
attributes)
attributes,
resources)
new ExecutorSummaryWrapper(info)
}
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonSerialize
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.ResourceInformation

case class ApplicationInfo private[spark](
id: String,
Expand Down Expand Up @@ -107,7 +108,8 @@ class ExecutorSummary private[spark](
@JsonSerialize(using = classOf[ExecutorMetricsJsonSerializer])
@JsonDeserialize(using = classOf[ExecutorMetricsJsonDeserializer])
val peakMemoryMetrics: Option[ExecutorMetrics],
val attributes: Map[String, String])
val attributes: Map[String, String],
val resources: Map[String, ResourceInformation])

class MemoryMetrics private[spark](
val usedOnHeapStorageMemory: Long,
Expand Down
25 changes: 23 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
Expand Down Expand Up @@ -488,7 +489,15 @@ private[spark] object JsonProtocol {
("Host" -> executorInfo.executorHost) ~
("Total Cores" -> executorInfo.totalCores) ~
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
("Attributes" -> mapToJson(executorInfo.attributes))
("Attributes" -> mapToJson(executorInfo.attributes)) ~
("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo))
}

def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
val jsonFields = m.map {
case (k, v) => JField(k, v.toJson)
}
JObject(jsonFields.toList)
}

def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
Expand Down Expand Up @@ -1069,7 +1078,11 @@ private[spark] object JsonProtocol {
case Some(attr) => mapFromJson(attr).toMap
case None => Map.empty[String, String]
}
new ExecutorInfo(executorHost, totalCores, logUrls, attributes)
val resources = jsonOption(json \ "Resources") match {
case Some(resources) => resourcesMapFromJson(resources).toMap
case None => Map.empty[String, ResourceInformation]
}
new ExecutorInfo(executorHost, totalCores, logUrls, attributes, resources)
}

def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
Expand All @@ -1085,6 +1098,14 @@ private[spark] object JsonProtocol {
* Util JSON deserialization methods |
* --------------------------------- */

def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
val resourceInfo = ResourceInformation.parseJson(v)
(k, resourceInfo)
}.toMap
}

def mapFromJson(json: JValue): Map[String, String] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id": "application_1536831636016_59384",
"name": "Spark Pi",
"attempts": [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
[ {
"id" : "application_1555004656427_0144",
"name" : "Spark shell",
"attempts" : [ {
"startTime" : "2019-07-02T21:02:17.180GMT",
"endTime" : "2019-07-02T21:02:35.974GMT",
"lastUpdated" : "",
"duration" : 18794,
"sparkUser" : "tgraves",
"completed" : true,
"appSparkVersion" : "3.0.0-SNAPSHOT",
"startTimeEpoch" : 1562101337180,
"lastUpdatedEpoch" : 0,
"endTimeEpoch" : 1562101355974
} ]
}, {
"id": "application_1536831636016_59384",
"name": "Spark Pi",
"attempts": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
"addTime" : "2015-02-03T16:43:00.906GMT",
"executorLogs" : { },
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
} ]
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
"MajorGCCount" : 3,
"MajorGCTime" : 170
},
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "2",
"hostPort" : "028.company.com:46325",
Expand Down Expand Up @@ -85,7 +86,8 @@
"totalOffHeapStorageMemory" : 0
},
"blacklistedInStages" : [ ],
"attributes" : { }
"attributes" : { },
"resources" : { }
}, {
"id" : "1",
"hostPort" : "036.company.com:35126",
Expand Down Expand Up @@ -118,5 +120,6 @@
"totalOffHeapStorageMemory" : 0
},
"blacklistedInStages" : [ ],
"attributes" : { }
} ]
"attributes" : { },
"resources" : { }
} ]
Loading