[SPARK-20219] Schedule tasks based on size of input from ScheduledRDD#17533
[SPARK-20219] Schedule tasks based on size of input from ScheduledRDD#17533jinxing64 wants to merge 9 commits intoapache:masterfrom
Conversation
|
Test build #75529 has started for PR 17533 at commit |
|
Test build #75531 has started for PR 17533 at commit |
|
Test build #75532 has started for PR 17533 at commit |
|
Tasks are scheduled by locality (which includes shuffle tasks too to some extent). |
|
Yes, I did the test in my cluster. In highly-skew stage, the time cost can be reduced significantly. Tasks are scheduled with locality preference. But in current code, input size of tasks are not taken into consideration. Think about this scenario:
In current code, the tasks are scheduled in serial order and task for partition-8 will be the last one to launch and the time cost is 12. This change is related to SPARK-19100. In my prod env, skew situations happens mostly on ShuffledRDD. Thus this pr proposes to consider the size of input from ShuffledRDD when scheduling. This change can bring benefit when skew situations and won't have negative impact on performance in other scenarios. |
|
|
||
| // Set containing all pending tasks (also used as a stack, as above). | ||
| private val allPendingTasks = new ArrayBuffer[Int] | ||
| private var allPendingTasks = new ArrayBuffer[Int] |
There was a problem hiding this comment.
I make this var, because I don't have a better approach to sort ArrayBuffer in place. Advice?
srowen
left a comment
There was a problem hiding this comment.
I'm always wary of touching task scheduling and don't feel that comfortable approving it, but the idea is plausible.
| t.epoch = epoch | ||
| } | ||
|
|
||
| val sortedPendingTasks = new AtomicBoolean(false) |
| blacklist.isExecutorBlacklistedForTaskSet(execId) | ||
| } | ||
| if (!isZombie && !offerBlacklisted) { | ||
| if (!sortedPendingTasks.get()) { |
There was a problem hiding this comment.
I think you need if (sortedPendingTasks.compareAndSet(false, true)) or else the point of AtomicBoolean is kind of lost
|
|
||
| private[this] def sortPendingTasks(): Unit = { | ||
| val taskIndexs = (0 until numTasks).toArray | ||
| implicit def ord = new Ordering[Int] { |
There was a problem hiding this comment.
Maybe clearer to use sortWith below and pass the ordering explicitly?
| // Visible for testing | ||
| private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: Map[Task[_], Long]) = { | ||
| taskInputSizeFromShuffledRDD.clear() | ||
| inputSize.foreach{ |
There was a problem hiding this comment.
I might miss something but is this not just adding all entries from one Map to another? does ++ do this directly?
| val taskIndexs = (0 until numTasks).toArray | ||
| implicit def ord = new Ordering[Int] { | ||
| override def compare(x: Int, y: Int): Int = | ||
| getTaskInputSizeFromShuffledRDD(tasks(x)) compare |
There was a problem hiding this comment.
Go ahead and use x.compare(y) rather than omit the syntax
|
|
||
| // Set containing all pending tasks (also used as a stack, as above). | ||
| private val allPendingTasks = new ArrayBuffer[Int] | ||
| private var allPendingTasks = new ArrayBuffer[Int] |
| case Some(size) => size | ||
| case None => | ||
| val size = | ||
| sched.dagScheduler.parentSplitsInShuffledRDD(task.stageId, task.partitionId) match { |
There was a problem hiding this comment.
This might still be clearer as .getOrElse(..... , 0L)
There was a problem hiding this comment.
Yes, this should be made clearer. But sorry, I didn't find a function like getOrElse(func, 0L).
There was a problem hiding this comment.
Oh, really I mean ...map(parentSplits => ...).getOrElse(0L)
|
Test build #75538 has finished for PR 17533 at commit
|
|
Test build #75542 has finished for PR 17533 at commit
|
|
Test build #75543 has finished for PR 17533 at commit
|
|
I'm hesitant about this and posted some comments on the JIRA (we should try to keep high-level discussion about whether this change makes sense there, so it's easier to reference in the future and not tangled up in the low-level PR comments) |
|
Test build #75547 has finished for PR 17533 at commit
|
|
@kayousterhout |
squito
left a comment
There was a problem hiding this comment.
@kayousterhout @mridulm what do you think about the refactor I suggested? Maybe that wouldn't really increase the complexity significantly?
@jinxing64 if you're really motivated, you could try it out and see how things look, though no promises yet ...
| case Some(size) => size | ||
| case None => | ||
| val size = | ||
| sched.dagScheduler.parentSplitsInShuffledRDD(task.stageId, task.partitionId).map { |
There was a problem hiding this comment.
I think the major complaint is this call, we don't want the TSM requesting info from the DAGSCheduler. But you could change that -- instead the DAGScheduler could push this info into the TSM after it has the input sizes. This actually might not be that bad, since in any case the DAGScheduler has to know this info when it calls submitMissingTasks. I don't think this info should change at all after the taskset has been submitted, right, so you'd have it all available at construction time?
| assert(manager.resourceOffer("exec", "host", ANY).get.index === 3) | ||
| assert(manager.resourceOffer("exec", "host", ANY).get.index === 1) | ||
| assert(manager.resourceOffer("exec", "host", ANY).get.index === 0) | ||
| } |
There was a problem hiding this comment.
we'd also want a test to make sure the sizes were getting computed correctly. (I think that might be easier to do with the refactor I suggested?)
|
@squito
Sorry I missed this point for the previous change. Now I push the info(size of input from ShuffledRDD) when create TSM. |
|
Test build #75634 has finished for PR 17533 at commit
|
|
Test build #75695 has started for PR 17533 at commit |
|
Test build #75697 has finished for PR 17533 at commit
|
squito
left a comment
There was a problem hiding this comment.
I think this is a lot more complicated than it needs to be. You should be able to simplify significantly by looking at what the code does for the "map-stage jobs", and how those MapStatistics are used later -- a left a couple of inline comments hinting at that, though I didn't figure out all the details.
fwiw, I'm no longer opposed to this for complicating the relationship between the DAGScheduler & TSM. This version maintains the current separation. Still, I do think in current form this is still introducing too much complexity. If it can be simplified a lot, then I might be more OK with it.
| } | ||
|
|
||
| // Visible for testing. | ||
| private[spark] def getTaskInputSizesFromShuffledRDD(tasks: Seq[Task[_]]): Map[Task[_], Long] = { |
There was a problem hiding this comment.
doesn't look like this needs to be exposed at all for tests. (and if it were used in tests, could probably be a little tighter as private[scheduler].)
| val noPartitionerConflict = rdd.partitioner match { | ||
| case Some(partitioner) => | ||
| partitioner.isInstanceOf[HashPartitioner] && | ||
| dep.partitioner.isInstanceOf[HashPartitioner] && |
There was a problem hiding this comment.
(a) I don't really understand what is going on here. why would rdd.partitioner ever be different from one of the shuffle dependencies partitioner? i thought shuffle dependencies always have to have the same partitioner?
(if there is a good reason, probably need a comment in the code)
(b) if this is needed, can probably just be partitioner == dep.partitioner -- that is simpler, equivalent for HashPartitioner, and allows it to still work for other partitioners as well.
There was a problem hiding this comment.
Yes, I always think rdd.partitioner should be the same with shuffle dependencies partitioner. But I found CustomShuffledRDD is a different one.
|
|
||
| /** | ||
| * Get ancestor splits in ShuffledRDD. | ||
| */ |
There was a problem hiding this comment.
this is called both parents and ancestors, which is confusing. I think it would be most accurate to call it the stage parent, and note that requires traversing some distance up the set of RDD ancestors.
Also I commented on this a bit below ... in general this is pretty confusing. It seems like there are a lot of cases which get ignored and I am not certain that's always OK.
Couldn't this entire thing be replaced with
val deps = getShuffleDependencies(rdd)
val partitioner = deps.head.partitioner
// make sure the partitioner is consistent across all our shuffle dependencies.
assert(deps.forall{_.partioner == partitioner})
val allStats = deps.map{mapOutputTracker.getStatistics(_.shuffleId)}
// TODO sum the stats per partition and go from thereThere was a problem hiding this comment.
Yes, this is confusing and I need to refine this.
I'm a little bit hesitant to use getShuffleDependencies. I need to get the total size of input from ShuffledRDD for every child's partition. After transformations like CoalescedRDD, there may be not a consistent one-to-one match between ancestor's partition index and child's partition index.
| parentSplits.map { | ||
| case (shuffleId, splits) => | ||
| splits.map(mapOutputTracker.getMapSizesByExecutorId(shuffleId, _) | ||
| .flatMap(_._2.map(_._2)).sum).sum |
There was a problem hiding this comment.
I think you can use mapOutputTracker.getStatistics here.
It also occurs to me that in general this could use the total input size for the task, but I guess spark isn't looking at that in general yet (though it probably could, from hadoop's InputSplit.getLength()). Just something to keep in mind.
There was a problem hiding this comment.
It also occurs to me that in general this could use the total input size for the task, but I guess spark isn't looking at that in general yet (though it probably could, from hadoop's InputSplit.getLength()). Just something to keep in mind.
Agree :)
| t.epoch = epoch | ||
| } | ||
|
|
||
| private val sortedPendingTasks = new AtomicBoolean(false) |
There was a problem hiding this comment.
I think all of this stuff w/ the delayed sorting etc. is now totally unnecessary. If you've got input sizes, then just sort the tasks when the TSM is created, which avoids a lot of complexity.
Perhaps the tasks should even just get sorted when the TaskSet is created in the first place, and then this doesn't know or care that the tasks have been sorted in any particular way.
There was a problem hiding this comment.
Yes, in current change, I do the ordering when create TaskSet, there is no change in TSM now. Thanks a lot for suggestion :)
|
@squito |
|
Test build #75800 has finished for PR 17533 at commit
|
|
Test build #75802 has finished for PR 17533 at commit
|
|
Hi @jinxing64, how is it going? |
|
@HyukjinKwon |
What changes were proposed in this pull request?
When data is highly skewed on
ShuffledRDD, it make sense to launch those tasks which process much more input as soon as possible. The current scheduling mechanism inTaskSetManageris quite simple:In scenario that "large tasks" locate at bottom half of tasks array, if tasks with much more input are launched early, we can significantly reduce the time cost and save resource when "dynamic allocation" is disabled.
How was this patch tested?
Added unit test in 'TaskSetManagerSuite'.