diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
index cd7124a572464..afbacb8013645 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala
@@ -67,9 +67,10 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
- if ((notRunningUnitTests || testExceptionThrown) && !(isStandaloneOrLocalCluster || isYarn)) {
- throw new SparkException("TaskResourceProfiles are only supported for Standalone and " +
- "Yarn cluster for now when dynamic allocation is disabled.")
+ if ((notRunningUnitTests || testExceptionThrown) &&
+ !(isStandaloneOrLocalCluster || isYarn || isK8s)) {
+ throw new SparkException("TaskResourceProfiles are only supported for Standalone, " +
+ "Yarn and Kubernetes cluster for now when dynamic allocation is disabled.")
}
} else {
val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
index 77dc7bcb4c56e..7149267583bc5 100644
--- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala
@@ -137,8 +137,8 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val error = intercept[SparkException] {
rpmanager.isSupported(taskProf)
}.getMessage
- assert(error === "TaskResourceProfiles are only supported for Standalone " +
- "and Yarn cluster for now when dynamic allocation is disabled.")
+ assert(error === "TaskResourceProfiles are only supported for Standalone, " +
+ "Yarn and Kubernetes cluster for now when dynamic allocation is disabled.")
// Local cluster: supports task resource profile.
conf.setMaster("local-cluster[1, 1, 1024]")
@@ -149,6 +149,11 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
conf.setMaster("yarn")
rpmanager = new ResourceProfileManager(conf, listenerBus)
assert(rpmanager.isSupported(taskProf))
+
+ // K8s: supports task resource profile.
+ conf.setMaster("k8s://foo")
+ rpmanager = new ResourceProfileManager(conf, listenerBus)
+ assert(rpmanager.isSupported(taskProf))
}
test("isSupported task resource profiles with dynamic allocation enabled") {
diff --git a/docs/configuration.md b/docs/configuration.md
index b95a063dbf8d2..bd908f3b34d78 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -3668,7 +3668,7 @@ See your cluster manager specific page for requirements and details on each of -
# Stage Level Scheduling Overview
The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run.
-This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on YARN and Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details.
+This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on YARN, Kubernetes and Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details.
See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. When dynamic allocation is disabled, tasks with different task resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. While when dynamic allocation is enabled, the current implementation acquires new executors for each `ResourceProfile` created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config `spark.scheduler.resource.profileMergeConflicts` to control that behavior. The current merge strategy Spark implements when `spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources.
diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md
index d585f8c7373fa..24e0575d83e4d 100644
--- a/docs/running-on-kubernetes.md
+++ b/docs/running-on-kubernetes.md
@@ -1949,5 +1949,7 @@ With the above configuration, the job will be scheduled by YuniKorn scheduler in
### Stage Level Scheduling Overview
-Stage level scheduling is supported on Kubernetes when dynamic allocation is enabled. This also requires spark.dynamicAllocation.shuffleTracking.enabled to be enabled since Kubernetes doesn't support an external shuffle service at this time. The order in which containers for different profiles is requested from Kubernetes is not guaranteed. Note that since dynamic allocation on Kubernetes requires the shuffle tracking feature, this means that executors from previous stages that used a different ResourceProfile may not idle timeout due to having shuffle data on them. This could result in using more cluster resources and in the worst case if there are no remaining resources on the Kubernetes cluster then Spark could potentially hang. You may consider looking at config spark.dynamicAllocation.shuffleTracking.timeout to set a timeout, but that could result in data having to be recomputed if the shuffle data is really needed.
+Stage level scheduling is supported on Kubernetes:
+- When dynamic allocation is disabled: It allows users to specify different task resource requirements at the stage level and will use the same executors requested at startup.
+- When dynamic allocation is enabled: It allows users to specify task and executor resource requirements at the stage level and will request the extra executors. This also requires spark.dynamicAllocation.shuffleTracking.enabled to be enabled since Kubernetes doesn't support an external shuffle service at this time. The order in which containers for different profiles is requested from Kubernetes is not guaranteed. Note that since dynamic allocation on Kubernetes requires the shuffle tracking feature, this means that executors from previous stages that used a different ResourceProfile may not idle timeout due to having shuffle data on them. This could result in using more cluster resources and in the worst case if there are no remaining resources on the Kubernetes cluster then Spark could potentially hang. You may consider looking at config spark.dynamicAllocation.shuffleTracking.timeout to set a timeout, but that could result in data having to be recomputed if the shuffle data is really needed.
Note, there is a difference in the way pod template resources are handled between the base default profile and custom ResourceProfiles. Any resources specified in the pod template file will only be used with the base default profile. If you create custom ResourceProfiles be sure to include all necessary resources there since the resources from the template file will not be propagated to custom ResourceProfiles.