Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private[spark] class Pool(
}
}

override def isSchedulable: Boolean = true

override def addSchedulable(schedulable: Schedulable): Unit = {
require(schedulable != null)
schedulableQueue.add(schedulable)
Expand Down Expand Up @@ -105,7 +107,7 @@ private[spark] class Pool(
val sortedSchedulableQueue =
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
}
sortedTaskSetQueue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private[spark] trait Schedulable {
def stageId: Int
def name: String

def isSchedulable: Boolean
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ private[spark] class TaskSchedulerImpl(
val availableResources = shuffledOffers.map(_.resources).toArray
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val resourceProfileIds = shuffledOffers.map(o => o.resourceProfileId).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,9 @@ private[spark] class TaskSetManager(
null
}

override def isSchedulable: Boolean = !isZombie &&
(pendingTasks.all.nonEmpty || pendingSpeculatableTasks.all.nonEmpty)

override def addSchedulable(schedulable: Schedulable): Unit = {}

override def removeSchedulable(schedulable: Schedulable): Unit = {}
Expand Down