diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 39c2af0184643..67d0d851b60fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -20,7 +20,6 @@ package org.apache.spark.deploy import java.net.URI import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils} -import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources private[spark] case class ApplicationDescription( name: String, @@ -40,7 +39,7 @@ private[spark] case class ApplicationDescription( def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores def resourceReqsPerExecutor: Seq[ResourceRequirement] = ResourceUtils.executorResourceRequestToRequirement( - getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName)) + defaultProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName)) override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index a2926ca64bcc3..e66933b84af55 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.deploy.ApplicationDescription import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceUtils} -import org.apache.spark.resource.ResourceProfile.{getCustomExecutorResources, DEFAULT_RESOURCE_PROFILE_ID} +import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.util.Utils @@ -101,7 +101,7 @@ private[spark] class ApplicationInfo( .map(_.toInt) .getOrElse(defaultMemoryMbPerExecutor) val customResources = ResourceUtils.executorResourceRequestToRequirement( - getCustomExecutorResources(resourceProfile).values.toSeq.sortBy(_.resourceName)) + resourceProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName)) rpIdToResourceDesc(resourceProfile.id) = ExecutorResourceDescription(coresPerExecutor, memoryMbPerExecutor, customResources) diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index 5e02c61459d1d..afd612433a7e1 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.JavaConverters._ import scala.collection.mutable -import org.apache.spark.{SparkConf, SparkContext, SparkException} +import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException} import org.apache.spark.annotation.{Evolving, Since} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -94,6 +94,15 @@ class ResourceProfile( executorResources.get(ResourceProfile.MEMORY).map(_.amount) } + private[spark] def getCustomTaskResources(): Map[String, TaskResourceRequest] = { + taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap + } + + protected[spark] def getCustomExecutorResources(): Map[String, ExecutorResourceRequest] = { + executorResources. + filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap + } + /* * This function takes into account fractional amounts for the task resource requirement. * Spark only supports fractional amounts < 1 to basically allow for multiple tasks @@ -182,8 +191,8 @@ class ResourceProfile( val numPartsPerResourceMap = new mutable.HashMap[String, Int] numPartsPerResourceMap(ResourceProfile.CORES) = 1 val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest] - taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this) - val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this) + taskResourcesToCheck ++= this.getCustomTaskResources() + val execResourceToCheck = this.getCustomExecutorResources() execResourceToCheck.foreach { case (rName, execReq) => val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0) numPartsPerResourceMap(rName) = 1 @@ -242,7 +251,8 @@ class ResourceProfile( // check that the task resources and executor resources are equal, but id's could be different private[spark] def resourcesEqual(rp: ResourceProfile): Boolean = { - rp.taskResources == taskResources && rp.executorResources == executorResources + rp.taskResources == taskResources && rp.executorResources == executorResources && + rp.getClass == this.getClass } override def hashCode(): Int = Seq(taskResources, executorResources).hashCode() @@ -253,6 +263,40 @@ class ResourceProfile( } } +/** + * Resource profile which only contains task resources, can be used for stage level task schedule + * when dynamic allocation is disabled, tasks will be scheduled to executors with default resource + * profile based on task resources described by this task resource profile. + * And when dynamic allocation is enabled, will require new executors for this profile based on + * the default executor resources requested at startup and assign tasks only on executors created + * with this resource profile. + * + * @param taskResources Resource requests for tasks. Mapped from the resource + * name (e.g., cores, memory, CPU) to its specific request. + */ +@Evolving +@Since("3.4.0") +private[spark] class TaskResourceProfile( + override val taskResources: Map[String, TaskResourceRequest]) + extends ResourceProfile(Map.empty, taskResources) { + + override protected[spark] def getCustomExecutorResources() + : Map[String, ExecutorResourceRequest] = { + if (SparkEnv.get == null) { + // This will be called in standalone master when dynamic allocation enabled. + return super.getCustomExecutorResources() + } + + val sparkConf = SparkEnv.get.conf + if (!Utils.isDynamicAllocationEnabled(sparkConf)) { + ResourceProfile.getOrCreateDefaultProfile(sparkConf) + .getCustomExecutorResources() + } else { + super.getCustomExecutorResources() + } + } +} + object ResourceProfile extends Logging { // task resources /** @@ -393,17 +437,6 @@ object ResourceProfile extends Logging { } } - private[spark] def getCustomTaskResources( - rp: ResourceProfile): Map[String, TaskResourceRequest] = { - rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap - } - - private[spark] def getCustomExecutorResources( - rp: ResourceProfile): Map[String, ExecutorResourceRequest] = { - rp.executorResources. - filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap - } - /* * Get the number of cpus per task if its set in the profile, otherwise return the * cpus per task for the default profile. diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala index f6b30d327375e..584ff32b4475a 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileBuilder.scala @@ -93,7 +93,11 @@ class ResourceProfileBuilder() { } def build(): ResourceProfile = { - new ResourceProfile(executorResources, taskResources) + if (_executorResources.isEmpty) { + new TaskResourceProfile(taskResources) + } else { + new ResourceProfile(executorResources, taskResources) + } } } diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala index 489d9c3e85817..3f48aaded5c2e 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfileManager.scala @@ -59,35 +59,67 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf, private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING) /** - * If we use anything except the default profile, it's only supported on YARN and Kubernetes - * with dynamic allocation enabled. Throw an exception if not supported. + * If we use anything except the default profile, it's supported on YARN, Kubernetes and + * Standalone with dynamic allocation enabled, and task resource profile with dynamic allocation + * disabled on Standalone. Throw an exception if not supported. */ private[spark] def isSupported(rp: ResourceProfile): Boolean = { - val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID - val notYarnOrK8sOrStandaloneAndNotDefaultProfile = - isNotDefaultProfile && !(isYarn || isK8s || isStandalone) - val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile = - isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled - // We want the exception to be thrown only when we are specifically testing for the - // exception or in a real application. Otherwise in all other testing scenarios we want - // to skip throwing the exception so that we can test in other modes to make testing easier. - if ((notRunningUnitTests || testExceptionThrown) && + if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) { + if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) { + throw new SparkException("TaskResourceProfiles are only supported for Standalone " + + "cluster for now when dynamic allocation is disabled.") + } + } else { + val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID + val notYarnOrK8sOrStandaloneAndNotDefaultProfile = + isNotDefaultProfile && !(isYarn || isK8s || isStandalone) + val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile = + isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled + + // We want the exception to be thrown only when we are specifically testing for the + // exception or in a real application. Otherwise in all other testing scenarios we want + // to skip throwing the exception so that we can test in other modes to make testing easier. + if ((notRunningUnitTests || testExceptionThrown) && (notYarnOrK8sOrStandaloneAndNotDefaultProfile || YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) { - throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " + - "and Standalone with dynamic allocation enabled.") - } + throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " + + "and Standalone with dynamic allocation enabled.") + } - if (isStandalone && rp.getExecutorCores.isEmpty && - sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) { - logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " + - "is explicitly set, you may get more executors allocated than expected. It's recommended " + - "to set executor cores explicitly. Please check SPARK-30299 for more details.") + if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty && + sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) { + logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " + + "is explicitly set, you may get more executors allocated than expected. " + + "It's recommended to set executor cores explicitly. " + + "Please check SPARK-30299 for more details.") + } } true } + /** + * Check whether a task with specific taskRpId can be scheduled to executors + * with executorRpId. + * + * Here are the rules: + * 1. When dynamic allocation is disabled, only [[TaskResourceProfile]] is supported, + * and tasks with [[TaskResourceProfile]] can be scheduled to executors with default + * resource profile. + * 2. For other scenarios(when dynamic allocation is enabled), tasks can be scheduled to + * executors where resource profile exactly matches. + */ + private[spark] def canBeScheduled(taskRpId: Int, executorRpId: Int): Boolean = { + assert(resourceProfileIdToResourceProfile.contains(taskRpId) && + resourceProfileIdToResourceProfile.contains(executorRpId), + "Tasks and executors must have valid resource profile id") + val taskRp = resourceProfileFromId(taskRpId) + + // When dynamic allocation disabled, tasks with TaskResourceProfile can always reuse + // all the executors with default resource profile. + taskRpId == executorRpId || (!dynamicEnabled && taskRp.isInstanceOf[TaskResourceProfile]) + } + def addResourceProfile(rp: ResourceProfile): Unit = { isSupported(rp) var putNewProfile = false diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala index 58b37269be4f8..0e18ecf0e5118 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala @@ -356,7 +356,7 @@ private[spark] object ResourceUtils extends Logging { val fileAllocated = parseAllocated(resourcesFileOpt, componentName) val fileAllocResMap = fileAllocated.map(a => (a.id.resourceName, a.toResourceInformation)).toMap // only want to look at the ResourceProfile for resources not in the resources file - val execReq = ResourceProfile.getCustomExecutorResources(resourceProfile) + val execReq = resourceProfile.getCustomExecutorResources() val filteredExecreq = execReq.filterNot { case (rname, _) => fileAllocResMap.contains(rname) } val rpAllocations = filteredExecreq.map { case (rName, execRequest) => val resourceId = new ResourceID(componentName, rName) @@ -444,8 +444,8 @@ private[spark] object ResourceUtils extends Logging { maxTaskPerExec = numTasksPerExecCores } } - val taskReq = ResourceProfile.getCustomTaskResources(rp) - val execReq = ResourceProfile.getCustomExecutorResources(rp) + val taskReq = rp.getCustomTaskResources() + val execReq = rp.getCustomExecutorResources() if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS)) { if ((taskCpus * maxTaskPerExec) < cores) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 475afd01d00c7..86786e64cedcf 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -44,7 +44,7 @@ import org.apache.spark.network.shuffle.protocol.MergeStatuses import org.apache.spark.network.util.JavaUtils import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult} import org.apache.spark.rdd.{RDD, RDDCheckpointData} -import org.apache.spark.resource.ResourceProfile +import org.apache.spark.resource.{ResourceProfile, TaskResourceProfile} import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY} import org.apache.spark.rpc.RpcTimeout import org.apache.spark.storage._ @@ -592,7 +592,12 @@ private[spark] class DAGScheduler( if (x.amount > v.amount) x else v).getOrElse(v) k -> larger } - new ResourceProfile(mergedExecReq, mergedTaskReq) + + if (mergedExecReq.isEmpty) { + new TaskResourceProfile(mergedTaskReq) + } else { + new ResourceProfile(mergedExecReq, mergedTaskReq) + } } /** diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d3e27a94e2944..a6735f380f18e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -388,9 +388,10 @@ private[spark] class TaskSchedulerImpl( val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host val taskSetRpID = taskSet.taskSet.resourceProfileId - // make the resource profile id a hard requirement for now - ie only put tasksets - // on executors where resource profile exactly matches. - if (taskSetRpID == shuffledOffers(i).resourceProfileId) { + + // check whether the task can be scheduled to the executor base on resource profile. + if (sc.resourceProfileManager + .canBeScheduled(taskSetRpID, shuffledOffers(i).resourceProfileId)) { val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, availableCpus(i), availableResources(i)) taskResAssignmentsOpt.foreach { taskResAssignments => @@ -463,7 +464,7 @@ private[spark] class TaskSchedulerImpl( // check if the ResourceProfile has cpus first since that is common case if (availCpus < taskCpus) return None // only look at the resource other then cpus - val tsResources = ResourceProfile.getCustomTaskResources(taskSetProf) + val tsResources = taskSetProf.getCustomTaskResources() if (tsResources.isEmpty) return Some(Map.empty) val localTaskReqAssign = HashMap[String, ResourceInformation]() // we go through all resources here so that we can make sure they match and also get what the @@ -1222,13 +1223,13 @@ private[spark] object TaskSchedulerImpl { /** * Calculate the max available task slots given the `availableCpus` and `availableResources` - * from a collection of ResourceProfiles. And only those ResourceProfiles who has the - * same id with the `rpId` can be used to calculate the task slots. + * from a collection of ResourceProfiles. And only those ResourceProfiles who can be assigned + * tasks with the `rpId` can be used to calculate the task slots. * * @param scheduler the TaskSchedulerImpl instance * @param conf SparkConf used to calculate the limiting resource and get the cpu amount per task - * @param rpId the target ResourceProfile id. Only those ResourceProfiles who has the same id - * with it can be used to calculate the task slots. + * @param rpId the ResourceProfile id for the task set. Only those ResourceProfiles who can be + * assigned with the tasks can be used to calculate the task slots. * @param availableRPIds an Array of ids of the available ResourceProfiles from the executors. * @param availableCpus an Array of the amount of available cpus from the executors. * @param availableResources an Array of the resources map from the executors. In the resource @@ -1257,7 +1258,7 @@ private[spark] object TaskSchedulerImpl { val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get availableCpus.zip(availableResources).zip(availableRPIds) - .filter { case (_, id) => id == rpId } + .filter { case (_, id) => scheduler.sc.resourceProfileManager.canBeScheduled(rpId, id) } .map { case ((cpu, resources), _) => val numTasksPerExecCores = cpu / cpusPerTask if (limitedByCpu) { diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala index aa0081356097f..e97d5c7883aa8 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala @@ -116,6 +116,43 @@ class ResourceProfileManagerSuite extends SparkFunSuite { assert(rpmanager.isSupported(immrprof)) } + test("isSupported task resource profiles with dynamic allocation disabled") { + val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4) + conf.set(DYN_ALLOCATION_ENABLED, false) + conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true") + + var rpmanager = new ResourceProfileManager(conf, listenerBus) + // default profile should always work + val defaultProf = rpmanager.defaultResourceProfile + assert(rpmanager.isSupported(defaultProf)) + + // task resource profile. + val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1) + val taskProf = new TaskResourceProfile(gpuTaskReq.requests) + assert(rpmanager.isSupported(taskProf)) + + conf.setMaster("local") + rpmanager = new ResourceProfileManager(conf, listenerBus) + val error = intercept[SparkException] { + rpmanager.isSupported(taskProf) + }.getMessage + assert(error === "TaskResourceProfiles are only supported for Standalone " + + "cluster for now when dynamic allocation is disabled.") + } + + test("isSupported task resource profiles with dynamic allocation enabled") { + val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4) + conf.set(DYN_ALLOCATION_ENABLED, true) + conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true") + + val rpmanager = new ResourceProfileManager(conf, listenerBus) + + // task resource profile. + val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1) + val taskProf = new TaskResourceProfile(gpuTaskReq.requests) + assert(rpmanager.isSupported(taskProf)) + } + test("isSupported with local mode") { val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4) conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true") diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala index 6c36f5c855547..d07b85847e77c 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileSuite.scala @@ -17,12 +17,15 @@ package org.apache.spark.resource -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.mockito.Mockito.when +import org.scalatestplus.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} import org.apache.spark.internal.config._ import org.apache.spark.internal.config.Python.PYSPARK_EXECUTOR_MEMORY import org.apache.spark.resource.TestResourceIDs._ -class ResourceProfileSuite extends SparkFunSuite { +class ResourceProfileSuite extends SparkFunSuite with MockitoSugar { override def beforeAll(): Unit = { try { @@ -190,6 +193,33 @@ class ResourceProfileSuite extends SparkFunSuite { assert(immrprof.isCoresLimitKnown == true) } + test("tasks and limit resource for task resource profile") { + val sparkConf = new SparkConf().setMaster("spark://testing") + .set(EXECUTOR_CORES, 2) + .set("spark.dynamicAllocation.enabled", "false") + .set("spark.executor.resource.gpu.amount", "2") + .set("spark.executor.resource.gpu.discoveryScript", "myscript") + + withMockSparkEnv(sparkConf) { + val rpBuilder1 = new ResourceProfileBuilder() + val rp1 = rpBuilder1 + .require(new TaskResourceRequests().resource("gpu", 1)) + .build() + assert(rp1.isInstanceOf[TaskResourceProfile]) + assert(rp1.limitingResource(sparkConf) == ResourceProfile.CPUS) + assert(rp1.maxTasksPerExecutor(sparkConf) == 2) + assert(rp1.isCoresLimitKnown) + + val rpBuilder2 = new ResourceProfileBuilder() + val rp2 = rpBuilder2 + .require(new TaskResourceRequests().resource("gpu", 2)) + .build() + assert(rp1.isInstanceOf[TaskResourceProfile]) + assert(rp2.limitingResource(sparkConf) == "gpu") + assert(rp2.maxTasksPerExecutor(sparkConf) == 1) + assert(rp2.isCoresLimitKnown) + } + } test("Create ResourceProfile") { val rprof = new ResourceProfileBuilder() @@ -257,6 +287,22 @@ class ResourceProfileSuite extends SparkFunSuite { assert(rprof.resourcesEqual(rprof2), "resource profile resourcesEqual not working") } + test("test TaskResourceProfiles equal") { + val rprofBuilder = new ResourceProfileBuilder() + val taskReq = new TaskResourceRequests().resource("gpu", 1) + rprofBuilder.require(taskReq) + val rprof = rprofBuilder.build() + + val taskReq1 = new TaskResourceRequests().resource("gpu", 1) + val rprof1 = new ResourceProfile(Map.empty, taskReq1.requests) + assert(!rprof.resourcesEqual(rprof1), + "resource profiles having different types should not equal") + + val taskReq2 = new TaskResourceRequests().resource("gpu", 1) + val rprof2 = new TaskResourceProfile(taskReq2.requests) + assert(rprof.resourcesEqual(rprof2), "task resource profile resourcesEqual not working") + } + test("Test ExecutorResourceRequests memory helpers") { val rprof = new ResourceProfileBuilder() val ereqs = new ExecutorResourceRequests() @@ -314,7 +360,7 @@ class ResourceProfileSuite extends SparkFunSuite { // Update this if new resource type added assert(ResourceProfile.allSupportedExecutorResources.size === 5, "Executor resources should have 5 supported resources") - assert(ResourceProfile.getCustomExecutorResources(rprof.build).size === 1, + assert(rprof.build().getCustomExecutorResources().size === 1, "Executor resources should have 1 custom resource") } @@ -327,7 +373,18 @@ class ResourceProfileSuite extends SparkFunSuite { .memoryOverhead("2048").pysparkMemory("1024").offHeapMemory("3072") rprof.require(taskReq).require(eReq) - assert(ResourceProfile.getCustomTaskResources(rprof.build).size === 1, + assert(rprof.build().getCustomTaskResources().size === 1, "Task resources should have 1 custom resource") } + + private def withMockSparkEnv(conf: SparkConf)(f: => Unit): Unit = { + val previousEnv = SparkEnv.get + val mockEnv = mock[SparkEnv] + when(mockEnv.conf).thenReturn(conf) + SparkEnv.set(mockEnv) + + try f finally { + SparkEnv.set(previousEnv) + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 10cd136d56408..26a7d71eefed9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.internal.config import org.apache.spark.internal.config.Tests import org.apache.spark.network.shuffle.ExternalBlockStoreClient import org.apache.spark.rdd.{DeterministicLevel, RDD} -import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceRequests} +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, ResourceProfileBuilder, TaskResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils.{FPGA, GPU} import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.local.LocalSchedulerBackend @@ -3424,6 +3424,23 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti assert(mergedRp.getExecutorCores.get == 4) } + test("test merge task resource profiles") { + conf.set(config.RESOURCE_PROFILE_MERGE_CONFLICTS.key, "true") + // Ensure the initialization of SparkEnv + sc + + val treqs1 = new TaskResourceRequests().cpus(1) + val rp1 = new TaskResourceProfile(treqs1.requests) + val treqs2 = new TaskResourceRequests().cpus(1) + val rp2 = new TaskResourceProfile(treqs2.requests) + val treqs3 = new TaskResourceRequests().cpus(2) + val rp3 = new TaskResourceProfile(treqs3.requests) + val mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3)) + + assert(mergedRp.isInstanceOf[TaskResourceProfile]) + assert(mergedRp.getTaskCpus.get == 2) + } + /** * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that * getShuffleDependenciesAndResourceProfiles correctly returns the direct shuffle dependencies diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index 869a72324376b..4e9e9755e8502 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -33,7 +33,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark._ import org.apache.spark.internal.config -import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceRequests} +import org.apache.spark.resource.{ExecutorResourceRequests, ResourceProfile, TaskResourceProfile, TaskResourceRequests} import org.apache.spark.resource.ResourceUtils._ import org.apache.spark.resource.TestResourceIDs._ import org.apache.spark.util.{Clock, ManualClock, ThreadUtils} @@ -1833,6 +1833,101 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext assert(2 == taskDescriptions.head.resources(GPU).addresses.size) } + test("Scheduler works with task resource profiles") { + val taskCpus = 1 + val taskGpus = 1 + val executorGpus = 4 + val executorCpus = 4 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + + val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + val taskSet = FakeTask.createTaskSet(3) + val rpTaskSet = FakeTask.createTaskSet(5, stageId = 1, stageAttemptId = 0, + priority = 0, rpId = rp.id) + + val resources0 = Map(GPU -> ArrayBuffer("0", "1", "2", "3")) + val resources1 = Map(GPU -> ArrayBuffer("4", "5", "6", "7")) + + val workerOffers = + IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0), + WorkerOffer("executor1", "host1", 4, None, resources1)) + + taskScheduler.submitTasks(taskSet) + taskScheduler.submitTasks(rpTaskSet) + // should have 3 for default profile and 2 for additional resource profile + var taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten + assert(5 === taskDescriptions.length) + var has2Gpus = 0 + var has1Gpu = 0 + for (tDesc <- taskDescriptions) { + assert(tDesc.resources.contains(GPU)) + if (tDesc.resources(GPU).addresses.size == 2) { + has2Gpus += 1 + } + if (tDesc.resources(GPU).addresses.size == 1) { + has1Gpu += 1 + } + } + assert(has2Gpus == 2) + assert(has1Gpu == 3) + + val resources3 = Map(GPU -> ArrayBuffer("8", "9", "10", "11")) + + // clear the first 2 worker offers so they don't have any room and add a third + // for the resource profile + val workerOffers3 = IndexedSeq( + WorkerOffer("executor0", "host0", 0, None, Map.empty), + WorkerOffer("executor1", "host1", 0, None, Map.empty), + WorkerOffer("executor2", "host2", 4, None, resources3)) + taskDescriptions = taskScheduler.resourceOffers(workerOffers3).flatten + assert(2 === taskDescriptions.length) + assert(taskDescriptions.head.resources.contains(GPU)) + assert(2 == taskDescriptions.head.resources(GPU).addresses.size) + } + + test("Calculate available tasks slots for task resource profiles") { + val taskCpus = 1 + val taskGpus = 1 + val executorGpus = 4 + val executorCpus = 4 + + val taskScheduler = setupScheduler(numCores = executorCpus, + config.CPUS_PER_TASK.key -> taskCpus.toString, + TASK_GPU_ID.amountConf -> taskGpus.toString, + EXECUTOR_GPU_ID.amountConf -> executorGpus.toString, + config.EXECUTOR_CORES.key -> executorCpus.toString + ) + + val treqs = new TaskResourceRequests().cpus(2).resource(GPU, 2) + val rp = new TaskResourceProfile(treqs.requests) + taskScheduler.sc.resourceProfileManager.addResourceProfile(rp) + + val resources0 = Map(GPU -> ArrayBuffer("0", "1", "2", "3")) + val resources1 = Map(GPU -> ArrayBuffer("4", "5", "6", "7")) + + val workerOffers = + IndexedSeq(WorkerOffer("executor0", "host0", 4, None, resources0), + WorkerOffer("executor1", "host1", 4, None, resources1)) + val availableResourcesAmount = workerOffers.map(_.resources).map { resourceMap => + // available addresses already takes into account if there are fractional + // task resource requests + resourceMap.map { case (name, addresses) => (name, addresses.length) } + } + + val taskSlotsForRp = TaskSchedulerImpl.calculateAvailableSlots( + taskScheduler, taskScheduler.conf, rp.id, workerOffers.map(_.resourceProfileId).toArray, + workerOffers.map(_.cores).toArray, availableResourcesAmount.toArray) + assert(taskSlotsForRp === 4) + } + private def setupSchedulerForDecommissionTests(clock: Clock, numTasks: Int): TaskSchedulerImpl = { // one task per host val numHosts = numTasks diff --git a/docs/configuration.md b/docs/configuration.md index 55e595ad30158..ffd36209e2da7 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -3243,9 +3243,9 @@ See your cluster manager specific page for requirements and details on each of - # Stage Level Scheduling Overview The stage level scheduling feature allows users to specify task and executor resource requirements at the stage level. This allows for different stages to run with executors that have different resources. A prime example of this is one ETL stage runs with executors with just CPUs, the next stage is an ML stage that needs GPUs. Stage level scheduling allows for user to request different executors that have GPUs when the ML stage runs rather then having to acquire executors with GPUs at the start of the application and them be idle while the ETL stage is being run. -This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details. +This is only available for the RDD API in Scala, Java, and Python. It is available on YARN, Kubernetes and Standalone when dynamic allocation is enabled. When dynamic allocation is disabled, it allows users to specify different task resource requirements at stage level, and this is supported on Standalone cluster right now. See the [YARN](running-on-yarn.html#stage-level-scheduling-overview) page or [Kubernetes](running-on-kubernetes.html#stage-level-scheduling-overview) page or [Standalone](spark-standalone.html#stage-level-scheduling-overview) page for more implementation details. -See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. The current implementation acquires new executors for each `ResourceProfile` created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config `spark.scheduler.resource.profileMergeConflicts` to control that behavior. The current merge strategy Spark implements when `spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources. +See the `RDD.withResources` and `ResourceProfileBuilder` API's for using this feature. When dynamic allocation is disabled, tasks with different task resource requirements will share executors with `DEFAULT_RESOURCE_PROFILE`. While when dynamic allocation is enabled, the current implementation acquires new executors for each `ResourceProfile` created and currently has to be an exact match. Spark does not try to fit tasks into an executor that require a different ResourceProfile than the executor was created with. Executors that are not in use will idle timeout with the dynamic allocation logic. The default configuration for this feature is to only allow one ResourceProfile per stage. If the user associates more then 1 ResourceProfile to an RDD, Spark will throw an exception by default. See config `spark.scheduler.resource.profileMergeConflicts` to control that behavior. The current merge strategy Spark implements when `spark.scheduler.resource.profileMergeConflicts` is enabled is a simple max of each resource within the conflicting ResourceProfiles. Spark will create a new ResourceProfile with the max of each of the resources. # Push-based shuffle overview diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 559e3bca6c934..b431752f166be 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -467,7 +467,9 @@ worker during one single schedule iteration. # Stage Level Scheduling Overview -Stage level scheduling is supported on Standalone when dynamic allocation is enabled. Currently, when the Master allocates executors for one application, it will schedule based on the order of the ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with smaller id will be scheduled firstly. Normally this won’t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. For scheduling, we will only take executor memory and executor cores from built-in executor resources and all other custom resources from a ResourceProfile, other built-in executor resources such as offHeap and memoryOverhead won't take any effect. The base default profile will be created based on the spark configs when you submit an application. Executor memory and executor cores from the base default profile can be propagated to custom ResourceProfiles, but all other custom resources can not be propagated. +Stage level scheduling is supported on Standalone: +- When dynamic allocation is disabled: It allows users to specify different task resource requirements at the stage level and will use the same executors requested at startup. +- When dynamic allocation is enabled: Currently, when the Master allocates executors for one application, it will schedule based on the order of the ResourceProfile ids for multiple ResourceProfiles. The ResourceProfile with smaller id will be scheduled firstly. Normally this won’t matter as Spark finishes one stage before starting another one, the only case this might have an affect is in a job server type scenario, so its something to keep in mind. For scheduling, we will only take executor memory and executor cores from built-in executor resources and all other custom resources from a ResourceProfile, other built-in executor resources such as offHeap and memoryOverhead won't take any effect. The base default profile will be created based on the spark configs when you submit an application. Executor memory and executor cores from the base default profile can be propagated to custom ResourceProfiles, but all other custom resources can not be propagated. ## Caveats