-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling. #27207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0891907
512b15f
270718a
27c1e8c
a15b2e0
ec3d3c6
2fceb62
913648a
202adac
349fddc
ebce3b4
d05b8da
8d35ea1
a60639a
24c8ad9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -543,6 +543,16 @@ package object config { | |
| .version("1.2.0") | ||
| .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) | ||
|
|
||
| private[spark] val LEGACY_LOCALITY_WAIT_RESET = | ||
| ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's make it an internal conf and add the version info.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what all does adding internal do? I see internal ones are not exposed when SQLConf.getAllDefinedConfs is called.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. internal means we can remove it later. Ideally all legacy configs should be internal. we can set version as
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes please add the .doc and .version - it would be nice to get into 3.0.0.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. all the configs are end-user facing (they can set and it will take effect). Internal config is for special use cases only (most of the users should not set it). |
||
| .doc("Whether to use the legacy behavior of locality wait, which resets the delay timer " + | ||
| "anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class " + | ||
| "documentation for more details.") | ||
| .internal() | ||
| .version("3.0.0") | ||
bmarcott marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| .booleanConf | ||
| .createWithDefault(false) | ||
|
|
||
| private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") | ||
| .version("0.5.0") | ||
| .timeConf(TimeUnit.MILLISECONDS) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,7 +36,7 @@ import org.apache.spark.rpc.RpcEndpoint | |
| import org.apache.spark.scheduler.SchedulingMode.SchedulingMode | ||
| import org.apache.spark.scheduler.TaskLocality.TaskLocality | ||
| import org.apache.spark.storage.BlockManagerId | ||
| import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} | ||
| import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Utils} | ||
|
|
||
| /** | ||
| * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. | ||
|
|
@@ -57,11 +57,24 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} | |
| * * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay | ||
| * scheduling | ||
| * * task-result-getter threads | ||
| * | ||
| * Delay Scheduling: | ||
| * Delay scheduling is an optimization that sacrifices job fairness for data locality in order to | ||
| * improve cluster and workload throughput. One useful definition of "delay" is how much time | ||
| * has passed since the TaskSet was using its fair share of resources. Since it is impractical to | ||
| * calculate this delay without a full simulation, the heuristic used is the time since the | ||
| * TaskSetManager last launched a task and has not rejected any resources due to delay scheduling | ||
| * since it was last offered its "fair share". A "fair share" offer is when [[resourceOffers]]'s | ||
| * parameter "isAllFreeResources" is set to true. A "delay scheduling reject" is when a resource | ||
| * is not utilized despite there being pending tasks (implemented inside [[TaskSetManager]]). | ||
| * The legacy heuristic only measured the time since the [[TaskSetManager]] last launched a task, | ||
| * and can be re-enabled by setting spark.locality.wait.legacyResetOnTaskLaunch to true. | ||
| */ | ||
| private[spark] class TaskSchedulerImpl( | ||
| val sc: SparkContext, | ||
| val maxTaskFailures: Int, | ||
| isLocal: Boolean = false) | ||
| isLocal: Boolean = false, | ||
| clock: Clock = new SystemClock) | ||
| extends TaskScheduler with Logging { | ||
|
|
||
| import TaskSchedulerImpl._ | ||
|
|
@@ -97,6 +110,11 @@ private[spark] class TaskSchedulerImpl( | |
| // on this class. Protected by `this` | ||
| private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] | ||
|
|
||
| // keyed by taskset | ||
| // value is true if the task set's locality wait timer was reset on the last resource offer | ||
| private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]() | ||
| private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET) | ||
|
|
||
| // Protected by `this` | ||
| private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] | ||
| // Protected by `this` | ||
|
|
@@ -125,7 +143,6 @@ private[spark] class TaskSchedulerImpl( | |
| protected val executorIdToHost = new HashMap[String, String] | ||
|
|
||
| private val abortTimer = new Timer(true) | ||
| private val clock = new SystemClock | ||
| // Exposed for testing | ||
| val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] | ||
|
|
||
|
|
@@ -319,20 +336,38 @@ private[spark] class TaskSchedulerImpl( | |
| taskSetsByStageIdAndAttempt -= manager.taskSet.stageId | ||
| } | ||
| } | ||
| resetOnPreviousOffer -= manager.taskSet | ||
| manager.parent.removeSchedulable(manager) | ||
| logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + | ||
| s" ${manager.parent.name}") | ||
| } | ||
|
|
||
| /** | ||
| * Offers resources to a single [[TaskSetManager]] at a given max allowed [[TaskLocality]]. | ||
| * | ||
| * @param taskSet task set manager to offer resources to | ||
| * @param maxLocality max locality to allow when scheduling | ||
| * @param shuffledOffers shuffled resource offers to use for scheduling, | ||
| * remaining resources are tracked by below fields as tasks are scheduled | ||
| * @param availableCpus remaining cpus per offer, | ||
| * value at index 'i' corresponds to shuffledOffers[i] | ||
| * @param availableResources remaining resources per offer, | ||
| * value at index 'i' corresponds to shuffledOffers[i] | ||
| * @param tasks tasks scheduled per offer, value at index 'i' corresponds to shuffledOffers[i] | ||
| * @param addressesWithDescs tasks scheduler per host:port, used for barrier tasks | ||
| * @return tuple of (had delay schedule rejects?, option of min locality of launched task) | ||
bmarcott marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| */ | ||
| private def resourceOfferSingleTaskSet( | ||
| taskSet: TaskSetManager, | ||
| maxLocality: TaskLocality, | ||
| shuffledOffers: Seq[WorkerOffer], | ||
| availableCpus: Array[Int], | ||
| availableResources: Array[Map[String, Buffer[String]]], | ||
| tasks: IndexedSeq[ArrayBuffer[TaskDescription]], | ||
| addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = { | ||
| var launchedTask = false | ||
| addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) | ||
| : (Boolean, Option[TaskLocality]) = { | ||
| var noDelayScheduleRejects = true | ||
| var minLaunchedLocality: Option[TaskLocality] = None | ||
| // nodes and executors that are blacklisted for the entire application have already been | ||
| // filtered out by this point | ||
| for (i <- 0 until shuffledOffers.size) { | ||
|
|
@@ -348,11 +383,14 @@ private[spark] class TaskSchedulerImpl( | |
| try { | ||
| val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) | ||
| val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) | ||
| val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality, | ||
| taskResAssignments) | ||
| val (taskDescOption, didReject) = | ||
| taskSet.resourceOffer(execId, host, maxLocality, taskResAssignments) | ||
| noDelayScheduleRejects &= !didReject | ||
| for (task <- taskDescOption) { | ||
| tasks(i) += task | ||
| val tid = task.taskId | ||
| val locality = taskSet.taskInfos(task.taskId).taskLocality | ||
| minLaunchedLocality = minTaskLocality(minLaunchedLocality, Some(locality)) | ||
| taskIdToTaskSetManager.put(tid, taskSet) | ||
| taskIdToExecutorId(tid) = execId | ||
| executorIdToRunningTaskIds(execId).add(tid) | ||
|
|
@@ -372,19 +410,18 @@ private[spark] class TaskSchedulerImpl( | |
| // The executor address is expected to be non empty. | ||
| addressesWithDescs += (shuffledOffers(i).address.get -> task) | ||
| } | ||
| launchedTask = true | ||
| } | ||
| } catch { | ||
| case e: TaskNotSerializableException => | ||
| logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") | ||
| // Do not offer resources for this task, but don't throw an error to allow other | ||
| // task sets to be submitted. | ||
| return launchedTask | ||
| return (noDelayScheduleRejects, minLaunchedLocality) | ||
| } | ||
| } | ||
| } | ||
| } | ||
| launchedTask | ||
| (noDelayScheduleRejects, minLaunchedLocality) | ||
| } | ||
|
|
||
| /** | ||
|
|
@@ -466,12 +503,28 @@ private[spark] class TaskSchedulerImpl( | |
| }.sum | ||
| } | ||
|
|
||
| private def minTaskLocality( | ||
| l1: Option[TaskLocality], | ||
| l2: Option[TaskLocality]) : Option[TaskLocality] = { | ||
| if (l1.isEmpty) { | ||
| l2 | ||
| } else if (l2.isEmpty) { | ||
| l1 | ||
| } else if (l1.get < l2.get) { | ||
| l1 | ||
| } else { | ||
| l2 | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Called by cluster manager to offer resources on slaves. We respond by asking our active task | ||
| * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so | ||
| * that tasks are balanced across the cluster. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we update the description of this function and explain the parameter "isAllFreeResources"?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes I can add something like: if true, then the parameter offers contains all workers and their free resources. See delay scheduling comments in class description.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is there are changes suggested and required, please file a separate new jira for this and link them. this pr has been merged and we have had to many followups at this point. |
||
| */ | ||
| def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { | ||
| def resourceOffers( | ||
| offers: IndexedSeq[WorkerOffer], | ||
| isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to set the default value?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I originally did this to not break the api + maintain something closer to previous behavior for callers who hadn't migrated to setting it to false. |
||
| // Mark each slave as alive and remember its hostname | ||
| // Also track if new executor is added | ||
| var newExecAvail = false | ||
|
|
@@ -544,18 +597,34 @@ private[spark] class TaskSchedulerImpl( | |
| s"number of available slots is $numBarrierSlotsAvailable.") | ||
| } else { | ||
| var launchedAnyTask = false | ||
| var noDelaySchedulingRejects = true | ||
| var globalMinLocality: Option[TaskLocality] = None | ||
| // Record all the executor IDs assigned barrier tasks on. | ||
| val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() | ||
| for (currentMaxLocality <- taskSet.myLocalityLevels) { | ||
| var launchedTaskAtCurrentMaxLocality = false | ||
| do { | ||
| launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, | ||
| currentMaxLocality, shuffledOffers, availableCpus, | ||
| val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet( | ||
| taskSet, currentMaxLocality, shuffledOffers, availableCpus, | ||
| availableResources, tasks, addressesWithDescs) | ||
| launchedTaskAtCurrentMaxLocality = minLocality.isDefined | ||
| launchedAnyTask |= launchedTaskAtCurrentMaxLocality | ||
| noDelaySchedulingRejects &= noDelayScheduleReject | ||
| globalMinLocality = minTaskLocality(globalMinLocality, minLocality) | ||
| } while (launchedTaskAtCurrentMaxLocality) | ||
| } | ||
|
|
||
| if (!legacyLocalityWaitReset) { | ||
tgravescs marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if (noDelaySchedulingRejects && launchedAnyTask) { | ||
| if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) { | ||
| taskSet.resetDelayScheduleTimer(globalMinLocality) | ||
| resetOnPreviousOffer.update(taskSet.taskSet, true) | ||
| } | ||
| } else { | ||
| resetOnPreviousOffer.update(taskSet.taskSet, false) | ||
| } | ||
| } | ||
|
|
||
| if (!launchedAnyTask) { | ||
| taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => | ||
| // If the taskSet is unschedulable we try to find an existing idle blacklisted | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.