Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package org.apache.spark.deploy
import java.net.URI

import org.apache.spark.resource.{ResourceProfile, ResourceRequirement, ResourceUtils}
import org.apache.spark.resource.ResourceProfile.getCustomExecutorResources

private[spark] case class ApplicationDescription(
name: String,
Expand All @@ -40,7 +39,7 @@ private[spark] case class ApplicationDescription(
def coresPerExecutor: Option[Int] = defaultProfile.getExecutorCores
def resourceReqsPerExecutor: Seq[ResourceRequirement] =
ResourceUtils.executorResourceRequestToRequirement(
getCustomExecutorResources(defaultProfile).values.toSeq.sortBy(_.resourceName))
defaultProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName))

override def toString: String = "ApplicationDescription(" + name + ")"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.deploy.ApplicationDescription
import org.apache.spark.resource.{ResourceInformation, ResourceProfile, ResourceUtils}
import org.apache.spark.resource.ResourceProfile.{getCustomExecutorResources, DEFAULT_RESOURCE_PROFILE_ID}
import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -101,7 +101,7 @@ private[spark] class ApplicationInfo(
.map(_.toInt)
.getOrElse(defaultMemoryMbPerExecutor)
val customResources = ResourceUtils.executorResourceRequestToRequirement(
getCustomExecutorResources(resourceProfile).values.toSeq.sortBy(_.resourceName))
resourceProfile.getCustomExecutorResources().values.toSeq.sortBy(_.resourceName))

rpIdToResourceDesc(resourceProfile.id) =
ExecutorResourceDescription(coresPerExecutor, memoryMbPerExecutor, customResources)
Expand Down
63 changes: 48 additions & 15 deletions core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, SparkException}
import org.apache.spark.annotation.{Evolving, Since}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -94,6 +94,15 @@ class ResourceProfile(
executorResources.get(ResourceProfile.MEMORY).map(_.amount)
}

private[spark] def getCustomTaskResources(): Map[String, TaskResourceRequest] = {
taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
}

protected[spark] def getCustomExecutorResources(): Map[String, ExecutorResourceRequest] = {
executorResources.
filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
}

/*
* This function takes into account fractional amounts for the task resource requirement.
* Spark only supports fractional amounts < 1 to basically allow for multiple tasks
Expand Down Expand Up @@ -182,8 +191,8 @@ class ResourceProfile(
val numPartsPerResourceMap = new mutable.HashMap[String, Int]
numPartsPerResourceMap(ResourceProfile.CORES) = 1
val taskResourcesToCheck = new mutable.HashMap[String, TaskResourceRequest]
taskResourcesToCheck ++= ResourceProfile.getCustomTaskResources(this)
val execResourceToCheck = ResourceProfile.getCustomExecutorResources(this)
taskResourcesToCheck ++= this.getCustomTaskResources()
val execResourceToCheck = this.getCustomExecutorResources()
execResourceToCheck.foreach { case (rName, execReq) =>
val taskReq = taskResources.get(rName).map(_.amount).getOrElse(0.0)
numPartsPerResourceMap(rName) = 1
Expand Down Expand Up @@ -242,7 +251,8 @@ class ResourceProfile(

// check that the task resources and executor resources are equal, but id's could be different
private[spark] def resourcesEqual(rp: ResourceProfile): Boolean = {
rp.taskResources == taskResources && rp.executorResources == executorResources
rp.taskResources == taskResources && rp.executorResources == executorResources &&
rp.getClass == this.getClass
}

override def hashCode(): Int = Seq(taskResources, executorResources).hashCode()
Expand All @@ -253,6 +263,40 @@ class ResourceProfile(
}
}

/**
* Resource profile which only contains task resources, can be used for stage level task schedule
* when dynamic allocation is disabled, tasks will be scheduled to executors with default resource
* profile based on task resources described by this task resource profile.
* And when dynamic allocation is enabled, will require new executors for this profile based on
* the default executor resources requested at startup and assign tasks only on executors created
* with this resource profile.
*
* @param taskResources Resource requests for tasks. Mapped from the resource
* name (e.g., cores, memory, CPU) to its specific request.
*/
@Evolving
@Since("3.4.0")
private[spark] class TaskResourceProfile(
override val taskResources: Map[String, TaskResourceRequest])
extends ResourceProfile(Map.empty, taskResources) {

override protected[spark] def getCustomExecutorResources()
: Map[String, ExecutorResourceRequest] = {
if (SparkEnv.get == null) {
// This will be called in standalone master when dynamic allocation enabled.
return super.getCustomExecutorResources()
}

val sparkConf = SparkEnv.get.conf
if (!Utils.isDynamicAllocationEnabled(sparkConf)) {
ResourceProfile.getOrCreateDefaultProfile(sparkConf)
.getCustomExecutorResources()
} else {
super.getCustomExecutorResources()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super.getCustomExecutorResources() returns empty?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is. And that's for compatibility when dynamic allocation is enabled.

}
}
}

object ResourceProfile extends Logging {
// task resources
/**
Expand Down Expand Up @@ -393,17 +437,6 @@ object ResourceProfile extends Logging {
}
}

private[spark] def getCustomTaskResources(
rp: ResourceProfile): Map[String, TaskResourceRequest] = {
rp.taskResources.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
}

private[spark] def getCustomExecutorResources(
rp: ResourceProfile): Map[String, ExecutorResourceRequest] = {
rp.executorResources.
filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
}

/*
* Get the number of cpus per task if its set in the profile, otherwise return the
* cpus per task for the default profile.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ class ResourceProfileBuilder() {
}

def build(): ResourceProfile = {
new ResourceProfile(executorResources, taskResources)
if (_executorResources.isEmpty) {
new TaskResourceProfile(taskResources)
} else {
new ResourceProfile(executorResources, taskResources)
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -59,35 +59,67 @@ private[spark] class ResourceProfileManager(sparkConf: SparkConf,
private val testExceptionThrown = sparkConf.get(RESOURCE_PROFILE_MANAGER_TESTING)

/**
* If we use anything except the default profile, it's only supported on YARN and Kubernetes
* with dynamic allocation enabled. Throw an exception if not supported.
* If we use anything except the default profile, it's supported on YARN, Kubernetes and
* Standalone with dynamic allocation enabled, and task resource profile with dynamic allocation
* disabled on Standalone. Throw an exception if not supported.
*/
private[spark] def isSupported(rp: ResourceProfile): Boolean = {
val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled
// We want the exception to be thrown only when we are specifically testing for the
// exception or in a real application. Otherwise in all other testing scenarios we want
// to skip throwing the exception so that we can test in other modes to make testing easier.
if ((notRunningUnitTests || testExceptionThrown) &&
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean TaskResourceProfile can be used when the dynamic is enabled? And in that case, TaskResourceProfile seems to never meet the requirement (i.e. taskRpId == executorRpId) in canBeScheduled() (except for the 1st created TaskResourceProfile) , right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Ngone51, thanks for the feedback, and for your concerns:

  1. TaskResourceProfile can be used when dynamic allocation is enabled.
  2. When dynamic allocation is enabled, TaskResourceProfile will be treated as a normal ResourceProfile with no specific executor resource requirements, and dynamic allocation manager will also request executors for TaskResourceProfile, so we'll have executors matching the same resource profile id. The behavior will be the same with what we have in master branch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it can be used with dynamic allocation, in that case it uses the default resource profile executor resources but it must acquire new executors. The TaskResourceProfile gets a unique rpid just like standard resource profile and it should go through the same path to get executors via dynamic allocation like a normal ResourceProfile (ie stage submitted kicks off). Is there something I'm not thinking about here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...default resource profile executor resources but it must acquire new executors.

So it should be the default resource profile executor resources but not the default rp id? Then, it makes sense to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, when dynamic allocation is enabled, it is just like a normal resource profile with a unique id, requesting executors based on default executor resources requirement.

if ((notRunningUnitTests || testExceptionThrown) && !isStandalone) {
throw new SparkException("TaskResourceProfiles are only supported for Standalone " +
"cluster for now when dynamic allocation is disabled.")
}
} else {
val isNotDefaultProfile = rp.id != ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
val notYarnOrK8sOrStandaloneAndNotDefaultProfile =
isNotDefaultProfile && !(isYarn || isK8s || isStandalone)
val YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile =
isNotDefaultProfile && (isYarn || isK8s || isStandalone) && !dynamicEnabled

// We want the exception to be thrown only when we are specifically testing for the
// exception or in a real application. Otherwise in all other testing scenarios we want
// to skip throwing the exception so that we can test in other modes to make testing easier.
if ((notRunningUnitTests || testExceptionThrown) &&
(notYarnOrK8sOrStandaloneAndNotDefaultProfile ||
YarnOrK8sOrStandaloneNotDynAllocAndNotDefaultProfile)) {
throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
"and Standalone with dynamic allocation enabled.")
}
throw new SparkException("ResourceProfiles are only supported on YARN and Kubernetes " +
"and Standalone with dynamic allocation enabled.")
}

if (isStandalone && rp.getExecutorCores.isEmpty &&
sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " +
"is explicitly set, you may get more executors allocated than expected. It's recommended " +
"to set executor cores explicitly. Please check SPARK-30299 for more details.")
if (isStandalone && dynamicEnabled && rp.getExecutorCores.isEmpty &&
sparkConf.getOption(config.EXECUTOR_CORES.key).isEmpty) {
logWarning("Neither executor cores is set for resource profile, nor spark.executor.cores " +
"is explicitly set, you may get more executors allocated than expected. " +
"It's recommended to set executor cores explicitly. " +
"Please check SPARK-30299 for more details.")
}
}

true
}

/**
* Check whether a task with specific taskRpId can be scheduled to executors
* with executorRpId.
*
* Here are the rules:
* 1. When dynamic allocation is disabled, only [[TaskResourceProfile]] is supported,
* and tasks with [[TaskResourceProfile]] can be scheduled to executors with default
* resource profile.
* 2. For other scenarios(when dynamic allocation is enabled), tasks can be scheduled to
* executors where resource profile exactly matches.
*/
private[spark] def canBeScheduled(taskRpId: Int, executorRpId: Int): Boolean = {
assert(resourceProfileIdToResourceProfile.contains(taskRpId) &&
resourceProfileIdToResourceProfile.contains(executorRpId),
"Tasks and executors must have valid resource profile id")
val taskRp = resourceProfileFromId(taskRpId)

// When dynamic allocation disabled, tasks with TaskResourceProfile can always reuse
// all the executors with default resource profile.
taskRpId == executorRpId || (!dynamicEnabled && taskRp.isInstanceOf[TaskResourceProfile])
}

def addResourceProfile(rp: ResourceProfile): Unit = {
isSupported(rp)
var putNewProfile = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ private[spark] object ResourceUtils extends Logging {
val fileAllocated = parseAllocated(resourcesFileOpt, componentName)
val fileAllocResMap = fileAllocated.map(a => (a.id.resourceName, a.toResourceInformation)).toMap
// only want to look at the ResourceProfile for resources not in the resources file
val execReq = ResourceProfile.getCustomExecutorResources(resourceProfile)
val execReq = resourceProfile.getCustomExecutorResources()
val filteredExecreq = execReq.filterNot { case (rname, _) => fileAllocResMap.contains(rname) }
val rpAllocations = filteredExecreq.map { case (rName, execRequest) =>
val resourceId = new ResourceID(componentName, rName)
Expand Down Expand Up @@ -444,8 +444,8 @@ private[spark] object ResourceUtils extends Logging {
maxTaskPerExec = numTasksPerExecCores
}
}
val taskReq = ResourceProfile.getCustomTaskResources(rp)
val execReq = ResourceProfile.getCustomExecutorResources(rp)
val taskReq = rp.getCustomTaskResources()
val execReq = rp.getCustomExecutorResources()

if (limitingResource.nonEmpty && !limitingResource.equals(ResourceProfile.CPUS)) {
if ((taskCpus * maxTaskPerExec) < cores) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.network.shuffle.protocol.MergeStatuses
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd.{RDD, RDDCheckpointData}
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.resource.{ResourceProfile, TaskResourceProfile}
import org.apache.spark.resource.ResourceProfile.{DEFAULT_RESOURCE_PROFILE_ID, EXECUTOR_CORES_LOCAL_PROPERTY, PYSPARK_MEMORY_LOCAL_PROPERTY}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.storage._
Expand Down Expand Up @@ -592,7 +592,12 @@ private[spark] class DAGScheduler(
if (x.amount > v.amount) x else v).getOrElse(v)
k -> larger
}
new ResourceProfile(mergedExecReq, mergedTaskReq)

if (mergedExecReq.isEmpty) {
new TaskResourceProfile(mergedTaskReq)
} else {
new ResourceProfile(mergedExecReq, mergedTaskReq)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,10 @@ private[spark] class TaskSchedulerImpl(
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
val taskSetRpID = taskSet.taskSet.resourceProfileId
// make the resource profile id a hard requirement for now - ie only put tasksets
// on executors where resource profile exactly matches.
if (taskSetRpID == shuffledOffers(i).resourceProfileId) {

// check whether the task can be scheduled to the executor base on resource profile.
if (sc.resourceProfileManager
.canBeScheduled(taskSetRpID, shuffledOffers(i).resourceProfileId)) {
val taskResAssignmentsOpt = resourcesMeetTaskRequirements(taskSet, availableCpus(i),
availableResources(i))
taskResAssignmentsOpt.foreach { taskResAssignments =>
Expand Down Expand Up @@ -463,7 +464,7 @@ private[spark] class TaskSchedulerImpl(
// check if the ResourceProfile has cpus first since that is common case
if (availCpus < taskCpus) return None
// only look at the resource other then cpus
val tsResources = ResourceProfile.getCustomTaskResources(taskSetProf)
val tsResources = taskSetProf.getCustomTaskResources()
if (tsResources.isEmpty) return Some(Map.empty)
val localTaskReqAssign = HashMap[String, ResourceInformation]()
// we go through all resources here so that we can make sure they match and also get what the
Expand Down Expand Up @@ -1222,13 +1223,13 @@ private[spark] object TaskSchedulerImpl {

/**
* Calculate the max available task slots given the `availableCpus` and `availableResources`
* from a collection of ResourceProfiles. And only those ResourceProfiles who has the
* same id with the `rpId` can be used to calculate the task slots.
* from a collection of ResourceProfiles. And only those ResourceProfiles who can be assigned
* tasks with the `rpId` can be used to calculate the task slots.
*
* @param scheduler the TaskSchedulerImpl instance
* @param conf SparkConf used to calculate the limiting resource and get the cpu amount per task
* @param rpId the target ResourceProfile id. Only those ResourceProfiles who has the same id
* with it can be used to calculate the task slots.
* @param rpId the ResourceProfile id for the task set. Only those ResourceProfiles who can be
* assigned with the tasks can be used to calculate the task slots.
* @param availableRPIds an Array of ids of the available ResourceProfiles from the executors.
* @param availableCpus an Array of the amount of available cpus from the executors.
* @param availableResources an Array of the resources map from the executors. In the resource
Expand Down Expand Up @@ -1257,7 +1258,7 @@ private[spark] object TaskSchedulerImpl {
val taskLimit = resourceProfile.taskResources.get(limitingResource).map(_.amount).get

availableCpus.zip(availableResources).zip(availableRPIds)
.filter { case (_, id) => id == rpId }
.filter { case (_, id) => scheduler.sc.resourceProfileManager.canBeScheduled(rpId, id) }
.map { case ((cpu, resources), _) =>
val numTasksPerExecCores = cpu / cpusPerTask
if (limitedByCpu) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,43 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
assert(rpmanager.isSupported(immrprof))
}

test("isSupported task resource profiles with dynamic allocation disabled") {
val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4)
conf.set(DYN_ALLOCATION_ENABLED, false)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")

var rpmanager = new ResourceProfileManager(conf, listenerBus)
// default profile should always work
val defaultProf = rpmanager.defaultResourceProfile
assert(rpmanager.isSupported(defaultProf))

// task resource profile.
val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
assert(rpmanager.isSupported(taskProf))

conf.setMaster("local")
rpmanager = new ResourceProfileManager(conf, listenerBus)
val error = intercept[SparkException] {
rpmanager.isSupported(taskProf)
}.getMessage
assert(error === "TaskResourceProfiles are only supported for Standalone " +
"cluster for now when dynamic allocation is disabled.")
}

test("isSupported task resource profiles with dynamic allocation enabled") {
val conf = new SparkConf().setMaster("spark://foo").set(EXECUTOR_CORES, 4)
conf.set(DYN_ALLOCATION_ENABLED, true)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")

val rpmanager = new ResourceProfileManager(conf, listenerBus)

// task resource profile.
val gpuTaskReq = new TaskResourceRequests().resource("gpu", 1)
val taskProf = new TaskResourceProfile(gpuTaskReq.requests)
assert(rpmanager.isSupported(taskProf))
}

test("isSupported with local mode") {
val conf = new SparkConf().setMaster("local").set(EXECUTOR_CORES, 4)
conf.set(RESOURCE_PROFILE_MANAGER_TESTING.key, "true")
Expand Down
Loading