Skip to content

Conversation

@jiangxb1987
Copy link
Contributor

@jiangxb1987 jiangxb1987 commented Apr 15, 2019

What changes were proposed in this pull request?

This PR adds support to schedule tasks with extra resource requirements (eg. GPUs) on executors with available resources. It also introduce a new method TaskContext.resources() so tasks can access available resource addresses allocated to them.

How was this patch tested?

  • Added new end-to-end test cases in SparkContextSuite;
  • Added new test case in CoarseGrainedSchedulerBackendSuite;
  • Added new test case in CoarseGrainedExecutorBackendSuite;
  • Added new test case in TaskSchedulerImplSuite;
  • Added new test case in TaskSetManagerSuite;
  • Updated existing tests.

@jiangxb1987
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Apr 15, 2019

Test build #104586 has finished for PR 24374 at commit aa0d9ae.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class ResourceInformation(
  • case class StatusUpdate(

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: "GPU" in all cases where it's referring to the hardware

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would units be here -- something like CUDA cores or GPU memory? below I just see "gpu" and "gpu.count" but there is already a separate count field.
Also, this doesn't account for type right? is that what the 'units' is supposed to help with?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for gpu the units would be empty. The idea is if things have a unit, like memory which has a unit you can put in GiB, MiB, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, and maybe I missed it, but are there docs or examples of this? does the script actually discover this information about anything? looks like it's just finding count and address of GPUs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the example script provided is just doing gpu's and units don't apply to gpu's so its just finding the addresses. I agree with you that we need some more docs and I'll comment the script better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just make this a case class? you don't need getters then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a user facing class and personally I think its better to have real class with getters as a more formal api for the user and potentially gives us ability to changes easier rather then having the parameters always public.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what a case class is though; it's just auto-generated. You can make parameters private if you want to not expose a getter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thats true, guess was just thinking it was more flexible, but really can't think of anything that would need to be mutable here so I'll change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later, you can use its equals/== method and toString without having to write additional code too (or if needs to be customized, define it here)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"comma-separated"

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No big deal but how about just one method that can change the count by a positive or negative amount?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this one take value like "1m"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be tidy, can the members here be private? or use override if they override superclass methods

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, how about defining and equals method in ResourceInformation? you get this for free with a case class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the new resources arg to resourceOffer be optional, so that not all these callers have to pass a new empty thing?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this for the driver too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is actually used on both executor and driver we should update comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the extra logInfo messages here and below

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove extra line

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not needed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add in the spark.driver.resource.gpu.discoveryScript and spark.driver.resource.gpu.addresses here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation looks off

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add license header.

Also it might be good if we reference this script from some of the discoveryScript documentation

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do cluster managers do something special so that with multiple executors, a script like this finds the right subset of gpus? I assume standalone does not, but maybe others do? That is worth mentioning somewhere as well (its also fine if that part of the story is in a later jira).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the script as is will find all GPU's visible to it, so if the cluster manager doesn't isolate it will find all on the host. Standalone mode, I would expect to use the --gpuDevices parameter to the Executor. It would be good to add a warning in the config description and here about that, or say the config isnt' supported in standalone mode.

@tgravescs
Copy link
Contributor

This pr also contains more then what is for this particular jira. SPARK-27024 and SPARK-27374 We should either add those jira to the header here or split it apart. If its hard to split apart and others are ok with it, I'm ok with it.

Please note that I wrote a bunch of this code for the executor side and discovery changes, so someone else will have to officially approve this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used anywhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the doc, it might be better to explain what the keys are.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrong indent?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this used?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It reads assuming no gpu, thus seems it is proposed to return empty array, instead of just throwing SparkException?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this requirement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help if the exception was more detailed about how resources would get wasted, though its a bit of a pain to do that. (eg. 4 cores and 3 gpus means 1 core will always be idle, etc.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just for GPUs on the driver? Will it be used?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes gpu's on the driver, I think the main use case is standalone mode or if someone doesn't have isolation. They can't just look on the host and take all the gpu's as they should only use the ones the cluster manager assigned to them.
Yes it will be by convention but its better thenwhat we have now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"gpu" -> ResourceInformation.GPU?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

User specified GPU resource for task: $GPUS_PER_TASK, but can't find any GPU resources available on the executor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra space before This.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should return -> This script should return

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the requirement at SparkConf, seems we don't allow to set(GPUS_PER_TASK.key, "1") for this setting, right? But I think it is a reasonable case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this line use "gpu" instead of ResourceInformation.GPU?

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't go through tests carefully yet, but otherwise just minor stuff, no major red flags

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe mention the config to use here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't need to import these since its the same package (sbt warns pretty loudly about this)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used? if so, maybe better to say directly what is being imported as its an unusual import

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was being used for stringOf in the log statement to print the Array, could just use mkstring instead

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: map { ids =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm missing something, I dont' understand what this comment is referring to

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

along the same lines -- how about removing availableResources from the parameter list entirely, and just have it get created inside ExecutorData from totalResources

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.foreach { r =>

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would help if the exception was more detailed about how resources would get wasted, though its a bit of a pain to do that. (eg. 4 cores and 3 gpus means 1 core will always be idle, etc.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do cluster managers do something special so that with multiple executors, a script like this finds the right subset of gpus? I assume standalone does not, but maybe others do? That is worth mentioning somewhere as well (its also fine if that part of the story is in a later jira).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its confusing that we call them "Indices" but they're strings, not ints. But then again, you also have code to check that they can parse as ints. Can this be made consistent? I think its preferable if we really know they are ints and can change all of the types from String to Int, and then use gpuIndices consistently. But if that is not the case, also OK to instead use gpuAddresses consistently.

@mengxr
Copy link
Contributor

mengxr commented Apr 16, 2019

@jiangxb1987 I think we should reduce the scope of this PR:

  1. Remove auto-discovery script and executor interface.
  2. Do not consider gpu as a special hardcoded resource name.
  3. Use static conf resource.accelerator_name.addresses to load accelerator addresses and resource.accelerator_name.count as request.

In this way, we can make this PR smaller and leave discovery and executor interface as follow-up work.

@tgravescs
Copy link
Contributor

@mengxr your point 3 I think has to deal with the executor side and the configs, so if we split those it would apply there. I'm also not sure what you mean by using a static conf there, how do you have a static conf that isn't hardcoded? You would have to build it on the fly to put in the resource type/accelerator_name.

If we want to remove gpu as known (hardcoded) resource name then we should go all the way generic on the scheduler pieces. I was going to do that in a follow on PR but we could do it here as well if people prefer. Or are you just saying remove the hardcoded configs and still only do gpu for now?

@tgravescs
Copy link
Contributor

I'll go ahead and try to split out the executor and discovery pieces from this. I think that may be easier to PR first on its own and then build the scheduler pieces next since it will need the resource information from the executor to make it decision. I'll try to apply everyone's comments above to that part and update here if it works out.

@squito
Copy link
Contributor

squito commented Apr 16, 2019

One general thought I have -- there seems to be a lot of changes to do general resource tracking, though only gpus are supported here. These are all internal classes, so I'm wondering whether its useful to even put in those abstractions now. Is FPGA support (or whatever other special hardware) still years away? If nobody has at least experimented with it at all, are we sure that the generalizations you're putting in would even be useful in those cases?

I don't really know anything about other accelerators, so I don't have any strong feelings here, just a general concern about putting in abstractions too early. Just wanted to mention it, I'll leave it up to you.

@tgravescs
Copy link
Contributor

I do know there is a company that sells a product to run Spark on FPGAs. I've also seen multiple talks on it. But I don't know how close it is for general user.

Personally I think it would be good to just do the generic thing as long as it doesn't add much overhead to the scheduler. I can see people wanting to add in possibly other information like GPU type, memory, etc. I can see a use for virtual GPU's if they want to share a GPU.

I was thinking if we put in the generic pieces people could more easily experiment without having to change core pieces.

@squito
Copy link
Contributor

squito commented Apr 16, 2019

ok that makes sense, like I said I'm willing to trust your judgement on that. Might be helpful to reach out to that company to at least see if they see any red flags with this approach.

@tgravescs
Copy link
Contributor

took me a bit longer to split the executor side stuff out and make fully generic, should have PR up tomorrow. My thought was I could put that up which gives the base for for @jiangxb1987 to do the scheduler changes on top of.

@tgravescs
Copy link
Contributor

I submitted PR #24394 to replace this. @jiangxb1987 can we close this one?

thanks for everyone's reviews. I tried to apply everyone's comments from here that were applicable to that pr, if missed anything please comment there.

@jiangxb1987
Copy link
Contributor Author

@tgravescs Sure I'm closing this one, thanks!

@tgravescs
Copy link
Contributor

sorry for any confusion, the executor pr is #24406

@jiangxb1987 jiangxb1987 reopened this Apr 18, 2019
@jiangxb1987 jiangxb1987 changed the title [SPARK-27366][CORE] Support GPU Resources in Spark job scheduling [WIP][SPARK-27366][CORE] Support GPU Resources in Spark job scheduling Apr 18, 2019
@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104716 has finished for PR 24374 at commit a251551.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104717 has finished for PR 24374 at commit 40eaca0.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104720 has finished for PR 24374 at commit fe7ed1b.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104722 has finished for PR 24374 at commit 054806a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 3, 2019

Test build #105098 has finished for PR 24374 at commit 5d5bd46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class ResourceInformation(

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106038 has finished for PR 24374 at commit f388338.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106040 has finished for PR 24374 at commit d15a51d.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2019

Test build #106042 has finished for PR 24374 at commit dcc147e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

if (Utils.isTesting) {
throw new SparkException(message)
} else {
logWarning(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

originally we talked about throwing here to not allow it, just want to make sure we intentionally changed our mind here? I'm really ok either way we go as there were some people questioning this on the Spip

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we now have TaskSchedulerImpl.resourcesMeetTaskRequirements() to ensure there are enough resources before schedule a task, I think it's safe to just place a warning here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer a warning because the discovery script might return more and it is out of user's control. And available resources might not happen to be a multiple of task requested counts. For example, you have 32 CPU Cores and 3 GPUs.

if (execCount.toInt / taskCount.toInt != numSlots) {
val message = s"The value of executor resource config: " +
s"${SPARK_EXECUTOR_RESOURCE_PREFIX + rName + SPARK_RESOURCE_COUNT_SUFFIX} " +
s"= $execCount is more than that tasks can take: $numSlots * " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is clear to the user what is wrong. its now really this ratio isn't the same as some other resources ratio.
Can we change this message to be more like:

The configuration of resource: rName (exec = X, task = y) will result in wasted resources due to resource $limitingResourceName limiting the # of runnable tasks per executor to: numslots. Please adjust your configuration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

val name: String,
private val addresses: Seq[String]) extends Serializable {

private val addressesMap = new HashMap[String, Boolean]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call this addressesAllocatedMap or similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update

*/
def acquire(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAvailable = addressesMap.getOrElse(address, false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we rename isAvailable to isAssigned or vise versa to keep acquire and release consistent

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the address doesn't exists we may also want to throw an Exception. Added more comments to make it clear.

task.resources.foreach { case (rName, rInfo) =>
availableResources(i).getOrElse(rName,
throw new SparkException(s"Try to acquire resource $rName that doesn't exist."))
.remove(0, rInfo.addresses.size)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is worth a comment saying removing the first x elements which is be the same as we allocated in taskSet.resourceOffer since its synchronized (rather then the exact ones allocated)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106135 has finished for PR 24374 at commit cd01cae.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106152 has finished for PR 24374 at commit cd01cae.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mengxr mengxr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will check the tests later today

// large enough if any task resources were specified.
taskResourcesAndCount.foreach { case (rName, taskCount) =>
val execCount = executorResourcesAndCounts(rName)
if (execCount.toInt / taskCount.toInt != numSlots) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

9 / 4 == 2. Use taskCount.toInt * numSlots < execCount.toInt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

if (Utils.isTesting) {
throw new SparkException(message)
} else {
logWarning(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer a warning because the discovery script might return more and it is out of user's control. And available resources might not happen to be a multiple of task requested counts. For example, you have 32 CPU Cores and 3 GPUs.

*/
private[spark] class ExecutorResourceInfo(
val name: String,
private val addresses: Seq[String]) extends Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove private val. addresses doesn't need to be a member variable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

val name: String,
private val addresses: Seq[String]) extends Serializable {

private val addressesAllocatedMap = new HashMap[String, Boolean]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Could you leave a TODO here to test OpenHashMap performance?
  • Rename addressesAllocatedMap to addressAvailabilityMap

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated


package org.apache.spark.scheduler

import scala.collection.mutable.HashMap
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we only import mutable and use mutable.HashMap in code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

addrs.foreach { address =>
val isAvailable = addressesAllocatedMap.getOrElse(address, false)
if (isAvailable) {
addressesAllocatedMap(address) = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you update the class ScalaDoc and mention that this class is intended to be used in a single thread?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

*/
def release(addrs: Seq[String]): Unit = {
addrs.foreach { address =>
val isAssigned = addressesAllocatedMap.getOrElse(address, true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • The isAssigned name is really confusing. It should be isAvailable.
  • Same here. Separate non-exist from assigned.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

* Exposed for testing only.
*/
private[scheduler] def assignedAddrs: Seq[String] =
addressesAllocatedMap.toList.filter(_._2 == false).map(_._1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

host: String,
maxLocality: TaskLocality.TaskLocality)
maxLocality: TaskLocality.TaskLocality,
availableResources: Map[String, Buffer[String]] = Map.empty)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change the value type to Seq[String] since this method shouldn't change it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
for ((k, v) <- resources) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resources.foreach {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

private def serializeResources(map: immutable.Map[String, ResourceInformation],
dataOut: DataOutputStream): Unit = {
dataOut.writeInt(map.size)
for ((key, value) <- map) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map.foreach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

dataOut.writeUTF(key)
dataOut.writeUTF(value.name)
dataOut.writeInt(value.addresses.size)
for (identifier <- value.addresses) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value.addresses.foreach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

immutable.Map[String, ResourceInformation] = {
val map = new HashMap[String, ResourceInformation]()
val mapSize = dataIn.readInt()
for (i <- 0 until mapSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use while instead of for

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

val name = dataIn.readUTF()
val numIdentifier = dataIn.readInt()
val identifiers = new ArrayBuffer[String](numIdentifier)
for (j <- 0 until numIdentifier) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

* @param executorHost The hostname that this executor is running on
* @param freeCores The current number of cores available for work on the executor
* @param totalCores The total number of cores available to the executor
* @param totalResources The information of all resources on the executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems totalResources is only used in test. Shall we remove it and rename availableResources to resourceInfo? It carries all the info we need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove it now if you want, but it will be needed for the UI work but we can add it back in there if you want

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

resourceInfo is a superset, and extra info there is also needed by UI

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move resourceInfo into ExecutorInfo ? Or we shall do it later when we consider the UI work for extra resources?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say we try to get this one merged and we can do UI separate and any changes needed there can be done there. Just remove totalResources for now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106164 has finished for PR 24374 at commit e539097.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 4, 2019

Test build #106167 has finished for PR 24374 at commit 82cd1e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@mengxr
Copy link
Contributor

mengxr commented Jun 4, 2019

LGTM and merged into master. Thanks!

@asfgit asfgit closed this in ac808e2 Jun 5, 2019
@jiangxb1987
Copy link
Contributor Author

Thanks very much! @tgravescs @mengxr @squito @srowen @viirya @kiszk

emanuelebardelli pushed a commit to emanuelebardelli/spark that referenced this pull request Jun 15, 2019
## What changes were proposed in this pull request?

This PR adds support to schedule tasks with extra resource requirements (eg. GPUs) on executors with available resources. It also introduce a new method `TaskContext.resources()` so tasks can access available resource addresses allocated to them.

## How was this patch tested?

* Added new end-to-end test cases in `SparkContextSuite`;
* Added new test case in `CoarseGrainedSchedulerBackendSuite`;
* Added new test case in `CoarseGrainedExecutorBackendSuite`;
* Added new test case in `TaskSchedulerImplSuite`;
* Added new test case in `TaskSetManagerSuite`;
* Updated existing tests.

Closes apache#24374 from jiangxb1987/gpu.

Authored-by: Xingbo Jiang <[email protected]>
Signed-off-by: Xiangrui Meng <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants