Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.spark

import scala.language.implicitConversions

import java.io._
import java.net.URI
import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.JavaConverters._
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.language.implicitConversions
import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -836,18 +838,22 @@ class SparkContext(config: SparkConf) extends Logging {
}

/**
* Return pools for fair scheduler
* TODO(xiajunluan): We should take nested pools into account
* :: DeveloperApi ::
* Return pools for fair scheduler
* TODO(xiajunluan): We should take nested pools into account
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is an API comment now, let's move the TODO to a // comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch; that would have been an embarrassing doc

*/
def getAllPools: ArrayBuffer[Schedulable] = {
taskScheduler.rootPool.schedulableQueue
@DeveloperApi
def getAllPools: Array[Schedulable] = {
taskScheduler.rootPool.schedulableQueue.asScala.toArray
}

/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}

/**
Expand Down
36 changes: 18 additions & 18 deletions core/src/main/scala/org/apache/spark/scheduler/Pool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.scheduler

import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap

import org.apache.spark.Logging
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
Expand All @@ -35,18 +37,15 @@ private[spark] class Pool(
extends Schedulable
with Logging {

var schedulableQueue = new ArrayBuffer[Schedulable]
var schedulableNameToSchedulable = new HashMap[String, Schedulable]

val schedulableQueue = new LinkedBlockingQueue[Schedulable]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: ConurrentLinkedQueue is a more suitable data structure for this type of access -- it provides a very efficient linked list, and it uses "add()" instead of "offer()". Also no blocking stuff.

val schedulableNameToSchedulable = new ConcurrentHashMap[String, Schedulable]
var weight = initWeight
var minShare = initMinShare
var runningTasks = 0

var priority = 0

// A pool's stage id is used to break the tie in scheduling.
var stageId = -1

var name = poolName
var parent: Pool = null

Expand All @@ -60,21 +59,21 @@ private[spark] class Pool(
}

override def addSchedulable(schedulable: Schedulable) {
schedulableQueue += schedulable
schedulableNameToSchedulable(schedulable.name) = schedulable
schedulableQueue.offer(schedulable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we add a require(schedule != null) here? It would cause a NPE anyway, I think this would just make it clearer. It also makes our use of Option(schedulableNameToSchedulable.get(...)) more clearly correct.

schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}

override def removeSchedulable(schedulable: Schedulable) {
schedulableQueue -= schedulable
schedulableNameToSchedulable -= schedulable.name
schedulableQueue.remove(schedulable)
schedulableNameToSchedulable.remove(schedulable.name)
}

override def getSchedulableByName(schedulableName: String): Schedulable = {
if (schedulableNameToSchedulable.contains(schedulableName)) {
return schedulableNameToSchedulable(schedulableName)
if (schedulableNameToSchedulable.containsKey(schedulableName)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, man, good catch

return schedulableNameToSchedulable.get(schedulableName)
}
for (schedulable <- schedulableQueue) {
for (schedulable <- schedulableQueue.asScala) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ".iterator" might work here and below, which would be a bit nicer since the documentation specifically provides guarantees about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, actually I think you actually just imported JavaConverters instead of JavaConversions. Probably don't need this stuff if you do that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that would be good

val sched = schedulable.getSchedulableByName(schedulableName)
if (sched != null) {
return sched
Expand All @@ -84,22 +83,23 @@ private[spark] class Pool(
}

override def executorLost(executorId: String, host: String) {
schedulableQueue.foreach(_.executorLost(executorId, host))
schedulableQueue.asScala.foreach(_.executorLost(executorId, host))
}

override def checkSpeculatableTasks(): Boolean = {
var shouldRevive = false
for (schedulable <- schedulableQueue) {
for (schedulable <- schedulableQueue.asScala) {
shouldRevive |= schedulable.checkSpeculatableTasks()
}
shouldRevive
}

override def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager] = {
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue = schedulableQueue.sortWith(taskSetSchedulingAlgorithm.comparator)
val sortedSchedulableQueue =
schedulableQueue.asScala.toArray.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue()
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
sortedTaskSetQueue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.concurrent.LinkedBlockingQueue

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
Expand All @@ -28,7 +30,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
private[spark] trait Schedulable {
var parent: Pool
// child queues
def schedulableQueue: ArrayBuffer[Schedulable]
def schedulableQueue: LinkedBlockingQueue[Schedulable]
def schedulingMode: SchedulingMode
def weight: Int
def minShare: Int
Expand All @@ -42,5 +44,5 @@ private[spark] trait Schedulable {
def getSchedulableByName(name: String): Schedulable
def executorLost(executorId: String, host: String): Unit
def checkSpeculatableTasks(): Boolean
def getSortedTaskSetQueue(): ArrayBuffer[TaskSetManager]
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private[spark] class TaskSchedulerImpl(
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)

// For now, pool information is only accessible in live UIs
val pools = if (live) sc.getAllPools else Seq[Schedulable]()
val pools = if (live) sc.getAllPools else Array[Schedulable]()
val poolTable = new PoolTable(pools, parent)

val summary: NodeSeq =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
}

def resourceOffer(rootPool: Pool): Int = {
val taskSetQueue = rootPool.getSortedTaskSetQueue()
val taskSetQueue = rootPool.getSortedTaskSetQueue
/* Just for Test*/
for (manager <- taskSetQueue) {
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(
Expand Down