Skip to content

Commit 343af9d

Browse files
committed
ResourceInformation API design
1 parent dcca830 commit 343af9d

File tree

5 files changed

+111
-18
lines changed

5 files changed

+111
-18
lines changed
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* :: DeveloperApi ::
24+
* Developer API to describe the information of resources, like GPU, FPGA and so on. This
25+
* information will be provided to TaskInfo, task could leverage such information to schedule
26+
* embedded jobs like MPI which requires additional resource information.
27+
*
28+
* @param tpe The type of resource, like "/cpu", "/gpu/k80", "/gpu/p100".
29+
* @param id The id of resource if provided, such as id of GPU card.
30+
* @param spec The detailed information of resource if provided, such as GPU spec, VRAM size
31+
* and so on.
32+
*/
33+
@DeveloperApi
34+
case class ResourceInformation(tpe: String, id: String = "N/A", spec: String = "N/A") {
35+
36+
private[spark] var occupiedByTask: Long = ResourceInformation.UNUSED
37+
38+
override def toString: String = s"$tpe id: $id and spec: $spec"
39+
}
40+
41+
private[spark] object ResourceInformation {
42+
val UNUSED = -1L
43+
}
44+

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler.cluster
1919

2020
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
21+
import org.apache.spark.scheduler.ResourceInformation
2122

2223
/**
2324
* Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
@@ -29,10 +30,12 @@ import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
2930
* @param totalCores The total number of cores available to the executor
3031
*/
3132
private[cluster] class ExecutorData(
32-
val executorEndpoint: RpcEndpointRef,
33-
val executorAddress: RpcAddress,
34-
override val executorHost: String,
35-
var freeCores: Int,
36-
override val totalCores: Int,
37-
override val logUrlMap: Map[String, String]
38-
) extends ExecutorInfo(executorHost, totalCores, logUrlMap)
33+
val executorEndpoint: RpcEndpointRef,
34+
val executorAddress: RpcAddress,
35+
override val executorHost: String,
36+
var freeCores: Int,
37+
override val totalCores: Int,
38+
override val logUrlMap: Map[String, String],
39+
override val resources: Array[ResourceInformation] = Array.empty)
40+
extends ExecutorInfo(executorHost, totalCores, logUrlMap, resources) {
41+
}

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorInfo.scala

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717
package org.apache.spark.scheduler.cluster
1818

1919
import org.apache.spark.annotation.DeveloperApi
20+
import org.apache.spark.scheduler.ResourceInformation
2021

2122
/**
2223
* :: DeveloperApi ::
2324
* Stores information about an executor to pass from the scheduler to SparkListeners.
2425
*/
2526
@DeveloperApi
2627
class ExecutorInfo(
27-
val executorHost: String,
28-
val totalCores: Int,
29-
val logUrlMap: Map[String, String]) {
28+
val executorHost: String,
29+
val totalCores: Int,
30+
val logUrlMap: Map[String, String],
31+
val resources: Array[ResourceInformation] = Array.empty) {
3032

3133
def canEqual(other: Any): Boolean = other.isInstanceOf[ExecutorInfo]
3234

@@ -35,12 +37,13 @@ class ExecutorInfo(
3537
(that canEqual this) &&
3638
executorHost == that.executorHost &&
3739
totalCores == that.totalCores &&
38-
logUrlMap == that.logUrlMap
40+
logUrlMap == that.logUrlMap &&
41+
resources.toSet == that.resources.toSet
3942
case _ => false
4043
}
4144

4245
override def hashCode(): Int = {
43-
val state = Seq(executorHost, totalCores, logUrlMap)
46+
val state = Seq(executorHost, totalCores, logUrlMap) ++ resources
4447
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
4548
}
4649
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,14 @@ private[spark] object JsonProtocol {
463463
def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
464464
("Host" -> executorInfo.executorHost) ~
465465
("Total Cores" -> executorInfo.totalCores) ~
466-
("Log Urls" -> mapToJson(executorInfo.logUrlMap))
466+
("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
467+
("Resources" -> JArray(executorInfo.resources.map(resourceInformationToJson).toList))
468+
}
469+
470+
def resourceInformationToJson(res: ResourceInformation): JValue = {
471+
("Type" -> res.tpe) ~
472+
("ID" -> res.id) ~
473+
("Spec" -> res.spec)
467474
}
468475

469476
def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
@@ -1012,7 +1019,17 @@ private[spark] object JsonProtocol {
10121019
val executorHost = (json \ "Host").extract[String]
10131020
val totalCores = (json \ "Total Cores").extract[Int]
10141021
val logUrls = mapFromJson(json \ "Log Urls").toMap
1015-
new ExecutorInfo(executorHost, totalCores, logUrls)
1022+
val resources = jsonOption(json \ "Resources").map { l =>
1023+
l.extract[List[JValue]].map(resourceInformationFromJson).toArray
1024+
}.getOrElse(Array.empty)
1025+
new ExecutorInfo(executorHost, totalCores, logUrls, resources)
1026+
}
1027+
1028+
def resourceInformationFromJson(json: JValue): ResourceInformation = {
1029+
val tpe = (json \ "Type").extract[String]
1030+
val id = (json \ "ID").extract[String]
1031+
val spec = (json \ "Spec").extract[String]
1032+
ResourceInformation(tpe, id, spec)
10161033
}
10171034

10181035
def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,10 @@ class JsonProtocolSuite extends SparkFunSuite {
7979
val applicationStartWithLogs = SparkListenerApplicationStart("The winner of all", Some("appId"),
8080
42L, "Garfield", Some("appAttempt"), Some(logUrlMap))
8181
val applicationEnd = SparkListenerApplicationEnd(42L)
82+
val resources = Array(ResourceInformation("/gpu/k80", "0"),
83+
ResourceInformation("/gpu/p100", "1"))
8284
val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
83-
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
85+
new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, resources))
8486
val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
8587
val executorBlacklisted = SparkListenerExecutorBlacklisted(executorBlacklistedTime, "exec1", 22)
8688
val executorUnblacklisted =
@@ -134,7 +136,9 @@ class JsonProtocolSuite extends SparkFunSuite {
134136
testTaskMetrics(makeTaskMetrics(
135137
33333L, 44444L, 55555L, 66666L, 7, 8, hasHadoopInput = false, hasOutput = false))
136138
testBlockManagerId(BlockManagerId("Hong", "Kong", 500))
137-
testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap))
139+
val resources = Array(ResourceInformation("/gpu/k80", "0"),
140+
ResourceInformation("/gpu/p100", "1"))
141+
testExecutorInfo(new ExecutorInfo("host", 43, logUrlMap, resources))
138142

139143
// StorageLevel
140144
testStorageLevel(StorageLevel.NONE)
@@ -436,9 +440,18 @@ class JsonProtocolSuite extends SparkFunSuite {
436440
testAccumValue(Some("anything"), 123, JString("123"))
437441
}
438442

443+
test("ExecutorInfo backward compatibility") {
444+
val logUrlMap = Map("stderr" -> "mystderr", "stdout" -> "mystdout").toMap
445+
val resources = Array(ResourceInformation("/gpu/k80", "0"),
446+
ResourceInformation("/gpu/p100", "1"))
447+
val executorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, resources)
448+
val oldExecutorInfo = JsonProtocol.executorInfoToJson(executorInfo)
449+
.removeField({_._1 == "Resources"})
450+
val expectedExecutorInfo = new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap, Array.empty)
451+
assertEquals(expectedExecutorInfo, JsonProtocol.executorInfoFromJson(oldExecutorInfo))
452+
}
439453
}
440454

441-
442455
private[spark] object JsonProtocolSuite extends Assertions {
443456
import InternalAccumulator._
444457

@@ -620,6 +633,7 @@ private[spark] object JsonProtocolSuite extends Assertions {
620633
private def assertEquals(info1: ExecutorInfo, info2: ExecutorInfo) {
621634
assert(info1.executorHost == info2.executorHost)
622635
assert(info1.totalCores == info2.totalCores)
636+
assert(info1.resources.toSet === info2.resources.toSet)
623637
}
624638

625639
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
@@ -1782,7 +1796,19 @@ private[spark] object JsonProtocolSuite extends Assertions {
17821796
| "Log Urls" : {
17831797
| "stderr" : "mystderr",
17841798
| "stdout" : "mystdout"
1785-
| }
1799+
| },
1800+
| "Resources": [
1801+
| {
1802+
| "Type": "/gpu/k80",
1803+
| "ID": "0",
1804+
| "Spec": "N/A"
1805+
| },
1806+
| {
1807+
| "Type": "/gpu/p100",
1808+
| "ID": "1",
1809+
| "Spec": "N/A"
1810+
| }
1811+
| ]
17861812
| }
17871813
|}
17881814
""".stripMargin

0 commit comments

Comments
 (0)