Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,34 @@ class DAGScheduler(
missing.toList
}

/**
* Get ancestor splits in ShuffledRDD.
*/
private[spark] def parentSplitsInShuffledRDD(stageId: Int, pId: Int): Option[Map[Int, Set[Int]]] =

@squito squito Apr 12, 2017

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is called both parents and ancestors, which is confusing. I think it would be most accurate to call it the stage parent, and note that requires traversing some distance up the set of RDD ancestors.

Also I commented on this a bit below ... in general this is pretty confusing. It seems like there are a lot of cases which get ignored and I am not certain that's always OK.

Couldn't this entire thing be replaced with

val deps = getShuffleDependencies(rdd)
val partitioner = deps.head.partitioner
// make sure the partitioner is consistent across all our shuffle dependencies.
assert(deps.forall{_.partioner == partitioner})
val allStats = deps.map{mapOutputTracker.getStatistics(_.shuffleId)}
// TODO sum the stats per partition and go from there

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is confusing and I need to refine this.
I'm a little bit hesitant to use getShuffleDependencies. I need to get the total size of input from ShuffledRDD for every child's partition. After transformations like CoalescedRDD, there may be not a consistent one-to-one match between ancestor's partition index and child's partition index.

{
stageIdToStage.get(stageId) match {
case Some(stage) =>
val waitingForVisit = new Stack[Tuple2[RDD[_], Int]]
waitingForVisit.push((stage.rdd, pId))
val ret = new HashMap[Int, HashSet[Int]]()
while(waitingForVisit.nonEmpty) {
val (rdd, split) = waitingForVisit.pop()
rdd.dependencies.foreach {
case dep: ShuffleDependency[_, _, _] =>
ret.getOrElseUpdate(dep.shuffleId, new HashSet[Int]()).add(split)
case dep: NarrowDependency[_] =>
dep.getParents(split).foreach {
case parentSplit =>
waitingForVisit.push((dep.rdd, parentSplit))
}
}
}
Some(ret.mapValues(_.toSet).toMap)
case None =>
None
}
}

/**
* Registers the given jobId among the jobs that need the given stage and
* all of that stage's ancestors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.math.max
Expand Down Expand Up @@ -138,7 +139,7 @@ private[spark] class TaskSetManager(
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

// Set containing all pending tasks (also used as a stack, as above).
private val allPendingTasks = new ArrayBuffer[Int]
private var allPendingTasks = new ArrayBuffer[Int]

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I make this var, because I don't have a better approach to sort ArrayBuffer in place. Advice?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, unfortunately.


// Tasks that can be speculated. Since these will be a small fraction of total
// tasks, we'll just hold them in a HashSet.
Expand Down Expand Up @@ -168,6 +169,10 @@ private[spark] class TaskSetManager(
t.epoch = epoch
}

private val sortedPendingTasks = new AtomicBoolean(false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of this stuff w/ the delayed sorting etc. is now totally unnecessary. If you've got input sizes, then just sort the tasks when the TSM is created, which avoids a lot of complexity.

Perhaps the tasks should even just get sorted when the TaskSet is created in the first place, and then this doesn't know or care that the tasks have been sorted in any particular way.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, in current change, I do the ordering when create TaskSet, there is no change in TSM now. Thanks a lot for suggestion :)


val taskInputSizeFromShuffledRDD = HashMap[Task[_], Long]()

// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
Expand Down Expand Up @@ -438,6 +443,10 @@ private[spark] class TaskSetManager(
blacklist.isExecutorBlacklistedForTaskSet(execId)
}
if (!isZombie && !offerBlacklisted) {
if (sortedPendingTasks.compareAndSet(false, true)) {
sortPendingTasks()
}

val curTime = clock.getTimeMillis()

var allowedLocality = maxLocality
Expand Down Expand Up @@ -512,6 +521,51 @@ private[spark] class TaskSetManager(
}
}

private[this] def sortPendingTasks(): Unit = {
val taskIndexs = (0 until numTasks).toArray
def ordFunc(x: Int, y: Int): Boolean = {
getTaskInputSizeFromShuffledRDD(tasks(x)) < getTaskInputSizeFromShuffledRDD(tasks(y))
}
if (tasks.nonEmpty) {
// Sort the tasks based on their input size from ShuffledRDD.
pendingTasksForExecutor.foreach {
case (k, v) => pendingTasksForExecutor(k) = v.sortWith(ordFunc)
}
pendingTasksForHost.foreach {
case (k, v) => pendingTasksForHost(k) = v.sortWith(ordFunc)
}
pendingTasksForRack.foreach {
case (k, v) => pendingTasksForRack(k) = v.sortWith(ordFunc)
}
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.sortWith(ordFunc)
allPendingTasks = allPendingTasks.sortWith(ordFunc)
}
}

// Visible for testing
private[spark] def setTaskInputSizeFromShuffledRDD(inputSize: Map[Task[_], Long]) = {
taskInputSizeFromShuffledRDD.clear()
taskInputSizeFromShuffledRDD ++= inputSize
}

private[this] def getTaskInputSizeFromShuffledRDD(task: Task[_]): Long = {
taskInputSizeFromShuffledRDD.get(task) match {
case Some(size) => size
case None =>
val size =
sched.dagScheduler.parentSplitsInShuffledRDD(task.stageId, task.partitionId).map {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the major complaint is this call, we don't want the TSM requesting info from the DAGSCheduler. But you could change that -- instead the DAGScheduler could push this info into the TSM after it has the input sizes. This actually might not be that bad, since in any case the DAGScheduler has to know this info when it calls submitMissingTasks. I don't think this info should change at all after the taskset has been submitted, right, so you'd have it all available at construction time?

case parentSplits =>
parentSplits.map {
case (shuffleId, splits) =>
splits.map(sched.mapOutputTracker.getMapSizesByExecutorId(shuffleId, _)
.flatMap(_._2.map(_._2)).sum).sum
}.sum
}.getOrElse(0L)
taskInputSizeFromShuffledRDD(task) = size
size
}
}

private def maybeFinishTaskSet() {
if (isZombie && runningTasks == 0) {
sched.taskSetFinished(this)
Expand Down Expand Up @@ -833,6 +887,7 @@ private[spark] class TaskSetManager(
s" has already succeeded).")
} else {
addPendingTask(index)
sortPendingTasks()
}

if (!isZombie && reason.countTowardsTaskFailures) {
Expand Down Expand Up @@ -904,6 +959,7 @@ private[spark] class TaskSetManager(
copiesRunning(index) -= 1
tasksSuccessful -= 1
addPendingTask(index)
sortPendingTasks()
// Tell the DAGScheduler that this task was resubmitted so that it doesn't think our
// stage finishes when a total of tasks.size tasks finish.
sched.dagScheduler.taskEnded(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,6 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
}
}


test("TaskSet with no preferences") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
Expand Down Expand Up @@ -1139,6 +1138,19 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
.updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
}

test("Schedule tasks based on size of input from ShuffledRDD.") {
sc = new SparkContext("local", "test")
sched = new FakeTaskScheduler(sc)
val taskSet = FakeTask.createTaskSet(4)
val clock = new ManualClock()
val manager = new TaskSetManager(sched, taskSet, 1, clock = clock)
manager.setTaskInputSizeFromShuffledRDD(taskSet.tasks.zip(Seq(1L, 100L, 10000L, 1000L)).toMap)
assert(manager.resourceOffer("exec", "host", ANY).get.index === 2)
assert(manager.resourceOffer("exec", "host", ANY).get.index === 3)
assert(manager.resourceOffer("exec", "host", ANY).get.index === 1)
assert(manager.resourceOffer("exec", "host", ANY).get.index === 0)
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'd also want a test to make sure the sizes were getting computed correctly. (I think that might be easier to do with the refactor I suggested?)


private def createTaskResult(
id: Int,
accumUpdates: Seq[AccumulatorV2[_, _]] = Seq.empty): DirectTaskResult[Int] = {
Expand Down