Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
if ((notRunningUnitTests || testExceptionThrown) && !isStandaloneOrLocalCluster) {
throw new SparkException("TaskResourceProfiles are only supported for Standalone " +
"cluster for now when dynamic allocation is disabled.")
if ((notRunningUnitTests || testExceptionThrown) && !(isStandaloneOrLocalCluster || isYarn)) {
throw new SparkException("TaskResourceProfiles are only supported for Standalone and " +
"Yarn cluster for now when dynamic allocation is disabled.")
}
} else {
val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,29 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val defaultProf = rpmanager.defaultResourceProfile
assert(rpmanager.isSupported(defaultProf))

// task resource profile.
// Standalone: supports task resource profile.
val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
assert(rpmanager.isSupported(taskProf))

// Local: doesn't support task resource profile.
conf.setMaster("local")
rpmanager = new ResourceProfileManager(conf, listenerBus)
val error = intercept[SparkException] {
rpmanager.isSupported(taskProf)
}.getMessage
assert(error === "TaskResourceProfiles are only supported for Standalone " +
"cluster for now when dynamic allocation is disabled.")
"and Yarn cluster for now when dynamic allocation is disabled.")

// Local cluster: supports task resource profile.
conf.setMaster("local-cluster[1, 1, 1024]")
rpmanager = new ResourceProfileManager(conf, listenerBus)
assert(rpmanager.isSupported(taskProf))

// Yarn: supports task resource profile.
conf.setMaster("yarn")
rpmanager = new ResourceProfileManager(conf, listenerBus)
assert(rpmanager.isSupported(taskProf))
}

test("isSupported task resource profiles with dynamic allocation enabled") {
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -3668,7 +3668,7 @@ See your cluster manager specific page for requirements and details on each of -
# Stage Level Scheduling Overview

The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run.
This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details.
This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on YARN and Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details.

See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. When dynamic allocation is disabled, tasks with different task resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. While when dynamic allocation is enabled, the current implementation acquires new executors for each `ResourceProfile` created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config `spark.scheduler.resource.profileMergeConflicts` to control that behavior. The current merge strategy Spark implements when `spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.

Expand Down
6 changes: 5 additions & 1 deletion docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,11 @@ YARN does not tell Spark the addresses of the resources allocated to each contai

# Stage Level Scheduling Overview

Stage level scheduling is supported on YARN when dynamic allocation is enabled. One thing to note that is YARN specific is that each ResourceProfile requires a different container priority on YARN. The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. This means that profiles created earlier will have a higher priority in YARN. Normally this won't matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind.
Stage level scheduling is supported on YARN:
- When dynamic allocation is disabled: It allows users to specify different task resource requirements at the stage level and will use the same executors requested at startup.
- When dynamic allocation is enabled: It allows users to specify task and executor resource requirements at the stage level and will request the extra executors.

One thing to note that is YARN specific is that each ResourceProfile requires a different container priority on YARN. The mapping is simply the ResourceProfile id becomes the priority, on YARN lower numbers are higher priority. This means that profiles created earlier will have a higher priority in YARN. Normally this won't matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind.
Note there is a difference in the way custom resources are handled between the base default profile and custom ResourceProfiles. To allow for the user to request YARN containers with extra resources without Spark scheduling on them, the user can specify resources via the <code>spark.yarn.executor.resource.</code> config. Those configs are only used in the base default profile though and do not get propagated into any other custom ResourceProfiles. This is because there would be no way to remove them if you wanted a stage to not have them. This results in your default profile getting custom resources defined in <code>spark.yarn.executor.resource.</code> plus spark defined resources of GPU or FPGA. Spark converts GPU and FPGA resources into the YARN built in types <code>yarn.io/gpu</code>) and <code>yarn.io/fpga</code>, but does not know the mapping of any other resources. Any other Spark custom resources are not propagated to YARN for the default profile. So if you want Spark to schedule based off a custom resource and have it requested from YARN, you must specify it in both YARN (<code>spark.yarn.{driver/executor}.resource.</code>) and Spark (<code>spark.{driver/executor}.resource.</code>) configs. Leave the Spark config off if you only want YARN containers with the extra resources but Spark not to schedule using them. Now for custom ResourceProfiles, it doesn't currently have a way to only specify YARN resources without Spark scheduling off of them. This means for custom ResourceProfiles we propagate all the resources defined in the ResourceProfile to YARN. We still convert GPU and FPGA to the YARN build in types as well. This requires that the name of any custom resources you specify match what they are defined as in YARN.

# Important notes
Expand Down