-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-27371][CORE] Support GPU-aware resources scheduling in Standalone #25047
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| val requests = parseAllResourceRequests(_conf, SPARK_DRIVER_PREFIX).map {req => | ||
| req.id.resourceName -> req.amount | ||
| }.toMap | ||
| // TODO(wuyi) log driver's acquired resources separately ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @Ngone51 . Please don't use user-id TODO in the patch. As you know, Apache Spark repository has already a few ancient user-id TODOs like this which is not fixed until now. :)
Since we don't know the future, let's use JIRA-IDed TODO like TODO(SPARK-XXX).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dongjoon-hyun Thank you for reminding that. I'll fix those TODOs in following commits.
| } | ||
|
|
||
| def hasEnoughResources(resourcesFree: Map[String, Int], resourceReqs: Map[String, Int]) | ||
| : Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. indentation.
| val delegate = brothers.head._2 | ||
| delegate.endpoint.send(ReleaseResources(worker.resourcesCanBeReleased)) | ||
| } else { | ||
| // TODO(wuyi5) cases here are hard to handle: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| * @return | ||
| */ | ||
| def acquireResources(resourceReqs: Map[String, Int]) | ||
| : Map[String, Seq[String]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit. indentation.
| def acquireResources(resourceReqs: Map[String, Int]) | ||
| : Map[String, Seq[String]] = { | ||
| resourceReqs.map { case (rName, amount) => | ||
| // TODO (wuyi) rName does not exists ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto for (wuyi).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does the comment mean? Why wouldn't rName exist?
| case e: Exception => | ||
| logError("Failed to create work directory " + workDir, e) | ||
| System.exit(1) | ||
| if (!Utils.createDirectory(workDir)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ur, is it the same? Utils.createDirectory seems to work differently from workDir.mkdirs().
Or, do we need to change the current behavior of createWorkDir fo this PR?
Oops. I realized that this is a newly added function here in this PR.
I was confused because this overloaded the existing one. Only parameters are different.
| * @param addresses Resource addresses provided by the executor/worker | ||
| */ | ||
| class ResourceAllocator(name: String, addresses: Seq[String]) extends Serializable { | ||
| /** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation.
|
Test build #107211 has finished for PR 25047 at commit
|
|
Test build #107229 has finished for PR 25047 at commit
|
|
Test build #107243 has finished for PR 25047 at commit
|
|
Test build #107249 has finished for PR 25047 at commit
|
|
Test build #107276 has finished for PR 25047 at commit
|
|
Test build #107285 has finished for PR 25047 at commit
|
|
I have a few general questions, note I haven't look at all of the code yet. I'm not an expert in standalone mode but it supports both a client mode and a cluster mode. In your description are you saying even the client mode will use the resource file and lock it? How do you know the client is running on a node with GPU's or a worker? I guess as long as location is the same it doesn't matter. This is one thing in YARN we never have handled, in client mode the user is on their own for resource coordination. It seems unreliable to assume you have multiple workers per node (for the case a worker crashes). When the worker dies it automatically kills any executors, correct? is there a chance it doesn't? |
|
Test build #108714 has finished for PR 25047 at commit
|
| private[spark] val SPARK_TASK_PREFIX = "spark.task" | ||
|
|
||
| private[spark] val SPARK_RESOURCES_COORDINATE = | ||
| ConfigBuilder("spark.resources.coordinate.enable") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
did you mean to turn this to false? If so we need to update the configuration.md to match. I'm ok either way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. I misunderstand your comment. I'd prefer it to be true. And tests' failures are probably affected by this change.
|
test this please |
| } | ||
| } | ||
|
|
||
| test("Workers should avoid resources conflict when launch from the same host") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be nice to add a test with the SPARK_RESOURCES_COORDINATE off to make sure all the resources from file/discovery returned properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
docs/spark-standalone.md
Outdated
| <td>(none)</td> | ||
| <td> | ||
| Path to resources file which is used to find various resources while worker starting up. | ||
| The content of resources file should be formatted like the <code>ResourceAllocation</code> class. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized this is supposed to be an array of ResourceAllocations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
exactly.
|
Test build #108721 has finished for PR 25047 at commit
|
|
Test build #108725 has finished for PR 25047 at commit
|
|
@Ngone51 can you remove the [WIP] from the description. I think this is really close was going to make one more pass through but things look good. |
|
@tgravescs Thanks for reminding this. Have updated title and description. |
| .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && | ||
| worker.coresFree >= coresPerExecutor) | ||
| .filter(canLaunchExecutor(_, app.desc)) | ||
| .sortBy(_.coresFree).reverse |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have an issue here if no Workers have the resources required by the executor/task requirements, then it doesn't warn/error and it doesn't retry. Basically I started a Worker without GPUs and then said I need gpus for my executor task and it end up hanging. I suppose one could argue this is ok as someone could start another Worker that has the resources, but I think we at least need to Warn about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it does retry when other executors or drivers finish. But, we can warn if executor or driver requires more resources than any of workers could have. BTW, I'm thinking do we have the same issue for memory and cores ? For example, a Worker has 10 cores at most while an executor ask for 20 cores ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right if something changes (other app, other workers, etc) it retries, but if I'm the only app on the cluster its not clear why the app isn't launching. The one thing I don't want is it to be to noisy though either. I thought about that before making the comment, because like you said if its just out of resources because other apps are running we don't really want to print anything. I think for now we should just limit it to resources and perhaps just say no Workers are configured with the resources you requested. If we can do that without much performance impact lets do it. If not maybe we just file a separate jira for it and look at it there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about this way?
for (app <- waitingApps) {
...
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, app.desc))
.sortBy(_.coresFree).reverse
if (waitingApps.size == 1 && usableWorkers.isEmpty) {
logWarn("The app requires more resources(mem, core, accelerator) than any of Workers could have.")
}
...
}
Telling "the Workers are not configured with the resources(I mean accelerator) as an app requested" may require more changes. For example, you may need to traversal workers again to judge whether it's due to resources(I mean accelerator) or memory or cores. Or, you need to refactor canLaunchExecutor to tell more details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that sounds good for now. Lets also leave it called resource since that is what its called everywhere right now. just leave off the (mem, core, accelerator) part.
| if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { | ||
| if (canLaunchDriver(worker, driver.desc)) { | ||
| val allocated = worker.acquireResources(driver.desc.resourceReqs) | ||
| driver.withResources(allocated) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming driver could also not launch if worker doesn't have any GPUs and it requests them, may want to warn here as well
| */ | ||
| private[spark] case class ResourceAllocation(id: ResourceID, addresses: Seq[String]) { | ||
| @Evolving | ||
| case class ResourceAllocation(id: ResourceID, addresses: Seq[String]) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I just realized we made this public but REsourceID is private still. lets make this private again and just put the format in the docs like you had it before. Again sorry for switching on you here. If we end up thinking users will find this useful then we can open it up to be public later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
never mind.
|
Test build #108858 has finished for PR 25047 at commit
|
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good, thanks @Ngone51
|
Thank you for your help @tgravescs |
What changes were proposed in this pull request?
In this PR, we implements a complete process of GPU-aware resources scheduling
in Standalone. The whole process looks like: Worker sets up isolated resources
when it starts up and registers to master along with its resources. And, Master
picks up usable workers according to driver/executor's resource requirements to
launch driver/executor on them. Then, Worker launches the driver/executor after
preparing resources file, which is created under driver/executor's working directory,
with specified resource addresses(told by master). When driver/executor finished,
their resources could be recycled to worker. Finally, if a worker stops, it
should always release its resources firstly.
For the case of Workers and Drivers in client mode run on the same host, we introduce
a config option named
spark.resources.coordinate.enable(default true) to indicatewhether Spark should coordinate resources for user. If
spark.resources.coordinate.enable=false, user should be responsible for configuring different resources for Workers and Drivers when use resourcesFile or discovery script. If true, Spark would help user to assign different resources for Workers and Drivers.The solution for Spark to coordinate resources among Workers and Drivers is:
Generally, use a shared file named allocated_resources.json to sync allocated
resources info among Workers and Drivers on the same host.
After a Worker or Driver found all resources using the configured resourcesFile and/or
discovery script during launching, it should filter out available resources by excluding resources already allocated in allocated_resources.json and acquire resources from available resources according to its own requirement. After that, it should write its allocated resources along with its process id (pid) into allocated_resources.json. Pid (proposed by @tgravescs) here used to check whether the allocated resources are still valid in case of Worker or Driver crashes and doesn't release resources properly. And when a Worker or Driver finished, normally, it would always clean up its own allocated resources in allocated_resources.json.
Note that we'll always get a file lock before any access to file allocated_resources.json
and release the lock finally.
Futhermore, we appended resources info in
WorkerSchedulerStateResponseto workaround master change behaviour in HA mode.
How was this patch tested?
Added unit tests in WorkerSuite, MasterSuite, SparkContextSuite.
Manually tested with client/cluster mode (e.g. multiple workers) in a single node Standalone.