Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,24 @@ private[spark] class Pool(

val schedulableQueue = new ConcurrentLinkedQueue[Schedulable]
val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
val weight = initWeight
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These appear to be pretty much redundant then? if they're just set once to another existing variable's value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pool extends Schedulable trait and needs to override weight. It is also used for taskToWeightRatio calculation at FairSchedulingAlgorithm level.

val minShare = initMinShare
var runningTasks = 0
var priority = 0
val priority = 0

// A pool's stage id is used to break the tie in scheduling.
var stageId = -1
var name = poolName
val name = poolName
var parent: Pool = null

var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
private val taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
case _ =>
val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
val msg = s"Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
throw new IllegalArgumentException(msg)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ private[spark] class TaskSchedulerImpl private[scheduler](
extends TaskScheduler with Logging
{

import TaskSchedulerImpl._

def this(sc: SparkContext) = {
this(
sc,
Expand Down Expand Up @@ -130,17 +132,18 @@ private[spark] class TaskSchedulerImpl private[scheduler](

val mapOutputTracker = SparkEnv.get.mapOutputTracker

var schedulableBuilder: SchedulableBuilder = null
var rootPool: Pool = null
private var schedulableBuilder: SchedulableBuilder = null
// default scheduler is FIFO
private val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
val schedulingMode: SchedulingMode = try {
SchedulingMode.withName(schedulingModeConf.toUpperCase)
} catch {
case e: java.util.NoSuchElementException =>
throw new SparkException(s"Unrecognized spark.scheduler.mode: $schedulingModeConf")
throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
}

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

Expand All @@ -150,16 +153,15 @@ private[spark] class TaskSchedulerImpl private[scheduler](

def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
Expand Down Expand Up @@ -683,6 +685,9 @@ private[spark] class TaskSchedulerImpl private[scheduler](


private[spark] object TaskSchedulerImpl {

val SCHEDULER_MODE_PROPERTY = "spark.scheduler.mode"

/**
* Used to balance containers across hosts.
*
Expand Down
36 changes: 19 additions & 17 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,24 @@ private[spark] class TaskSetManager(
private val numFailures = new Array[Int](numTasks)

val taskAttempts = Array.fill[List[TaskInfo]](numTasks)(Nil)
var tasksSuccessful = 0
private[scheduler] var tasksSuccessful = 0

var weight = 1
var minShare = 0
val weight = 1
val minShare = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
val name = "TaskSet_" + taskSet.id
var parent: Pool = null
var totalResultSize = 0L
var calculatedTasks = 0
private var totalResultSize = 0L
private var calculatedTasks = 0

private[scheduler] val taskSetBlacklistHelperOpt: Option[TaskSetBlacklist] = {
blacklistTracker.map { _ =>
new TaskSetBlacklist(conf, stageId, clock)
}
}

val runningTasksSet = new HashSet[Long]
private[scheduler] val runningTasksSet = new HashSet[Long]

override def runningTasks: Int = runningTasksSet.size

Expand All @@ -105,7 +105,7 @@ private[spark] class TaskSetManager(
// state until all tasks have finished running; we keep TaskSetManagers that are in the zombie
// state in order to continue to track and account for the running tasks.
// TODO: We should kill any running task attempts when the task set manager becomes a zombie.
var isZombie = false
private[scheduler] var isZombie = false

// Set of pending tasks for each executor. These collections are actually
// treated as stacks, in which new tasks are added to the end of the
Expand All @@ -129,17 +129,17 @@ private[spark] class TaskSetManager(
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

// Set containing pending tasks with no locality preferences.
var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

// Set containing all pending tasks (also used as a stack, as above).
val allPendingTasks = new ArrayBuffer[Int]
private val allPendingTasks = new ArrayBuffer[Int]

// Tasks that can be speculated. Since these will be a small fraction of total
// tasks, we'll just hold them in a HashSet.
val speculatableTasks = new HashSet[Int]
private[scheduler] val speculatableTasks = new HashSet[Int]

// Task index, start and finish time for each task attempt (indexed by task ID)
val taskInfos = new HashMap[Long, TaskInfo]
private val taskInfos = new HashMap[Long, TaskInfo]

// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
Expand All @@ -148,7 +148,7 @@ private[spark] class TaskSetManager(
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
// was printed. This should ideally be an LRU map that can drop old exceptions automatically.
val recentExceptions = HashMap[String, (Int, Long)]()
private val recentExceptions = HashMap[String, (Int, Long)]()

// Figure out the current map output tracker epoch and set it on all tasks
val epoch = sched.mapOutputTracker.getEpoch
Expand All @@ -169,20 +169,22 @@ private[spark] class TaskSetManager(
* This allows a performance optimization, of skipping levels that aren't relevant (eg., skip
* PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors).
*/
var myLocalityLevels = computeValidLocalityLevels()
var localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level
private[scheduler] var myLocalityLevels = computeValidLocalityLevels()

// Time to wait at each level
private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)

// Delay scheduling variables: we keep track of our current locality level and the time we
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
// We then move down if we manage to launch a "more local" task.
var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
private var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level

override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null

override def schedulingMode: SchedulingMode = SchedulingMode.NONE

var emittedTaskSizeWarning = false
private[scheduler] var emittedTaskSizeWarning = false

/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val cancelledStages = new HashSet[Int]()

val taskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
override def start() = {}
override def stop() = {}
override def executorHeartbeatReceived(
Expand Down Expand Up @@ -542,8 +542,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
// make sure that the DAGScheduler doesn't crash when the TaskScheduler
// doesn't implement killTask()
val noKillTaskScheduler = new TaskScheduler() {
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
override def start(): Unit = {}
override def stop(): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ private class DummySchedulerBackend extends SchedulerBackend {

private class DummyTaskScheduler extends TaskScheduler {
var initialized = false
override def rootPool: Pool = null
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
override def schedulingMode: SchedulingMode = SchedulingMode.FIFO
override def rootPool: Pool = new Pool("", schedulingMode, 0, 0)
override def start(): Unit = {}
override def stop(): Unit = {}
override def submitTasks(taskSet: TaskSet): Unit = {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,12 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
assert(testPool.getSchedulableByName(taskSetManager.name) === taskSetManager)
}

test("Pool should throw IllegalArgumentException when schedulingMode is not supported") {
intercept[IllegalArgumentException] {
new Pool("TestPool", SchedulingMode.NONE, 0, 1)
}
}

private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
val selectedPool = rootPool.getSchedulableByName(poolName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B

def setupScheduler(confs: (String, String)*): TaskSchedulerImpl = {
val conf = new SparkConf().setMaster("local").setAppName("TaskSchedulerImplSuite")
confs.foreach { case (k, v) =>
conf.set(k, v)
}
confs.foreach { case (k, v) => conf.set(k, v) }
sc = new SparkContext(conf)
taskScheduler = new TaskSchedulerImpl(sc)
setupHelper()
Expand Down Expand Up @@ -904,4 +902,12 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(taskDescs.size === 1)
assert(taskDescs.head.executorId === "exec2")
}

test("TaskScheduler should throw IllegalArgumentException when schedulingMode is not supported") {
intercept[IllegalArgumentException] {
val taskScheduler = setupScheduler(
TaskSchedulerImpl.SCHEDULER_MODE_PROPERTY -> SchedulingMode.NONE.toString)
taskScheduler.initialize(new FakeSchedulerBackend)
}
}
}