diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 174b73221afc..60194f8e6cf5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -33,7 +33,8 @@ private[spark] class Pool( val poolName: String, val schedulingMode: SchedulingMode, initMinShare: Int, - initWeight: Int) + initWeight: Int, + initMaxRunning: Int) extends Schedulable with Logging { @@ -41,6 +42,7 @@ private[spark] class Pool( val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable] var weight = initWeight var minShare = initMinShare + val maxRunning = initMaxRunning var runningTasks = 0 var priority = 0 @@ -97,8 +99,14 @@ private[spark] class Pool( override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] + def targetSchedulables = { sq:ConcurrentLinkedQueue[Schedulable] => + if (maxRunning != 0) + sq.take(maxRunning).toSeq + else + sq.toSeq + } val sortedSchedulableQueue = - schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) + targetSchedulables(schedulableQueue).sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala index a87ef030e69c..f1f2e3f50913 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Schedulable.scala @@ -34,6 +34,7 @@ private[spark] trait Schedulable { def schedulingMode: SchedulingMode def weight: Int def minShare: Int + def maxRunning: Int def runningTasks: Int def priority: Int def stageId: Int diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala index 6c5827f75e63..f4d699b0c566 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala @@ -60,11 +60,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) val MINIMUM_SHARES_PROPERTY = "minShare" val SCHEDULING_MODE_PROPERTY = "schedulingMode" val WEIGHT_PROPERTY = "weight" + val MAXIMUM_RUNNING_PROPERTY = "maxRunning" val POOL_NAME_PROPERTY = "@name" val POOLS_PROPERTY = "pool" val DEFAULT_SCHEDULING_MODE = SchedulingMode.FIFO val DEFAULT_MINIMUM_SHARE = 0 val DEFAULT_WEIGHT = 1 + val DEFAULT_MAXIMUM_RUNNING = 0 override def buildPools() { var is: Option[InputStream] = None @@ -89,10 +91,10 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) private def buildDefaultPool() { if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) { val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, - DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT, DEFAULT_MAXIMUM_RUNNING) rootPool.addSchedulable(pool) - logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( - DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) + logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d, maxRunning: %d".format( + DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT, DEFAULT_MAXIMUM_RUNNING)) } } @@ -104,6 +106,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) var schedulingMode = DEFAULT_SCHEDULING_MODE var minShare = DEFAULT_MINIMUM_SHARE var weight = DEFAULT_WEIGHT + var maxRunning = DEFAULT_MAXIMUM_RUNNING val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text if (xmlSchedulingMode != "") { @@ -125,7 +128,12 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) weight = xmlWeight.toInt } - val pool = new Pool(poolName, schedulingMode, minShare, weight) + val xmlMaxRunning = (poolNode \ MAXIMUM_RUNNING_PROPERTY).text + if (xmlMaxRunning != "") { + maxRunning = xmlMaxRunning.toInt + } + + val pool = new Pool(poolName, schedulingMode, minShare, weight, maxRunning) rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) @@ -142,7 +150,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf) // we will create a new pool that user has configured in app // instead of being defined in xml file parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, - DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) + DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT, DEFAULT_MAXIMUM_RUNNING) rootPool.addSchedulable(parentPool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index ed3dde0fc305..892fc5220e79 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -126,7 +126,7 @@ private[spark] class TaskSchedulerImpl( def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty - rootPool = new Pool("", schedulingMode, 0, 0) + rootPool = new Pool("", schedulingMode, 0, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 82455b0426a5..37f32eb2cd4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -91,6 +91,7 @@ private[spark] class TaskSetManager( var weight = 1 var minShare = 0 + val maxRunning = 0 var priority = taskSet.priority var stageId = taskSet.stageId var name = "TaskSet_" + taskSet.stageId.toString diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index 9ba2af54dacf..e8f736620acb 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -40,6 +40,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { Pool Name Minimum Share + Maximum Running Stages Pool Weight Active Stages Running Tasks @@ -65,6 +66,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: StagesTab) { {p.name} {p.minShare} + {p.maxRunning} {p.weight} {activeStages} {p.runningTasks} diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index 467796d7c24b..6403f62aad13 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -44,11 +44,18 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { assert(nextTaskSetToSchedule.get.stageId === expectedStageId) } + def verifyNoRemainedTask(rootPool: Pool): Unit = { + val taskSetQueue = rootPool.getSortedTaskSetQueue + val nextTaskSetToSchedule = + taskSetQueue.find(t => (t.runningTasks + t.tasksSuccessful) < t.numTasks) + assert(nextTaskSetToSchedule.isEmpty) + } + test("FIFO Scheduler Test") { sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskScheduler = new TaskSchedulerImpl(sc) - val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0) + val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0, 0) val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) schedulableBuilder.buildPools() @@ -78,7 +85,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "TaskSchedulerImplSuite", conf) val taskScheduler = new TaskSchedulerImpl(sc) - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0, 0) val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf) schedulableBuilder.buildPools() @@ -137,19 +144,19 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext("local", "TaskSchedulerImplSuite") val taskScheduler = new TaskSchedulerImpl(sc) - val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0) - val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1) - val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1) + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0, 0) + val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1, 0) + val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1, 0) rootPool.addSchedulable(pool0) rootPool.addSchedulable(pool1) - val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2) - val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1) + val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2, 0) + val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1, 0) pool0.addSchedulable(pool00) pool0.addSchedulable(pool01) - val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2) - val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1) + val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2, 0) + val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1, 0) pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) @@ -178,4 +185,30 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext { scheduleTaskAndVerifyId(2, rootPool, 6) scheduleTaskAndVerifyId(3, rootPool, 2) } + + test("Fair Scheduler maxRunning Test") { + sc = new SparkContext("local", "TaskSchedulerImplSuite") + val taskScheduler = new TaskSchedulerImpl(sc) + + val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0, 3) + + val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) + val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) + val taskSetManager2 = createTaskSetManager(2, 2, taskScheduler) + val taskSetManager3 = createTaskSetManager(3, 2, taskScheduler) + + rootPool.addSchedulable(taskSetManager0) + rootPool.addSchedulable(taskSetManager1) + rootPool.addSchedulable(taskSetManager2) + rootPool.addSchedulable(taskSetManager3) + + scheduleTaskAndVerifyId(0, rootPool, 0) + scheduleTaskAndVerifyId(1, rootPool, 1) + scheduleTaskAndVerifyId(2, rootPool, 2) + scheduleTaskAndVerifyId(3, rootPool, 0) + scheduleTaskAndVerifyId(4, rootPool, 1) + scheduleTaskAndVerifyId(5, rootPool, 2) + + verifyNoRemainedTask(rootPool) + } }