Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
}

// Create an instance of the class with the given name, possibly initializing it with our conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantLock

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{Buffer, HashMap, HashSet}

import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}

Expand Down Expand Up @@ -71,13 +70,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)

// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[String, Int]
val coresByTaskId = new mutable.HashMap[String, Int]
var totalCoresAcquired = 0

// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
// we need to maintain e.g. failure state and connection state.
private val slaves = new HashMap[String, Slave]
private val slaves = new mutable.HashMap[String, Slave]

/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation.
Expand Down Expand Up @@ -285,7 +284,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

private def declineUnmatchedOffers(
d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
offers.foreach { offer =>
declineOffer(d, offer, Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
Expand All @@ -302,9 +301,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
val ports = getRangeResource(offer.getResourcesList, "ports")

logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
s" cpu: $cpus for $refuseSeconds seconds" +
s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
reason.map(r => s" (reason: $r)").getOrElse(""))

refuseSeconds match {
Expand All @@ -323,26 +323,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
* @param offers Mesos offers that match attribute constraints
*/
private def handleMatchedOffers(
d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val offerMem = getResource(offer.getResourcesList, "mem")
val offerCpus = getResource(offer.getResourcesList, "cpus")
val offerPorts = getRangeResource(offer.getResourcesList, "ports")
val id = offer.getId.getValue

if (tasks.contains(offer.getId)) { // accept
val offerTasks = tasks(offer.getId)

logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.")
s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
s" Launching ${offerTasks.size} Mesos tasks.")

for (task <- offerTasks) {
val taskId = task.getTaskId
val mem = getResource(task.getResourcesList, "mem")
val cpus = getResource(task.getResourcesList, "cpus")
val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")

logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.")
logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
s" ports: $ports")
}

d.launchTasks(
Expand All @@ -365,9 +369,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
* @param offers Mesos offers that match attribute constraints
* @return A map from OfferID to a list of Mesos tasks to launch on that offer
*/
private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
// offerID -> tasks
val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)

// offerID -> resources
val remainingResources = mutable.Map(offers.map(offer =>
Expand Down Expand Up @@ -397,18 +401,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)

val (afterCPUResources, cpuResourcesToUse) =
partitionResources(resources, "cpus", taskCPUs)
val (resourcesLeft, memResourcesToUse) =
partitionResources(afterCPUResources.asJava, "mem", taskMemory)
val (resourcesLeft, resourcesToUse) =
partitionTaskResources(resources, taskCPUs, taskMemory)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)

taskBuilder.addAllResources(resourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
Expand All @@ -428,18 +430,39 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}

/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
: (List[Resource], List[Resource]) = {

// partition cpus & mem
val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
val (afterMemResources, memResourcesToUse) =
partitionResources(afterCPUResources.asJava, "mem", taskMemory)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newline here

Copy link
Contributor Author

@skonto skonto Jul 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not see exactly where i should place a new line. But i will separate vals anyway by adding comments on top of them. Vals should be grouped together according to the guidelines.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just thought a newline separating the port handling would improve readability. I generally prefer to separate each logical block with newlines. Not a blocker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok np


// If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
// on the same host. This essentially means one executor per host.
// TODO: handle network isolator case
val (nonPortResources, portResourcesToUse) =
partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)

(nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
}

private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
val cpus = executorCores(offerCPUs)
val mem = executorMemory(sc)
val ports = getRangeResource(resources, "ports")
val meetsPortRequirements = checkPorts(sc.conf, ports)

cpus > 0 &&
cpus <= offerCPUs &&
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors() < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements
}

private def executorCores(offerCPUs: Int): Int = {
Expand Down Expand Up @@ -613,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}

private class Slave(val hostname: String) {
val taskIDs = new HashSet[String]()
val taskIDs = new mutable.HashSet[String]()
var taskFailures = 0
var shuffleRegistered = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The additions to this file seem overly complex for just grabbing port resources. Maybe all this complexity is necessary, but I'm skeptical. I started in on it and got lost. I don't have time to review it all today. Please ensure there is no more room for simplification.

/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
*

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove these added asterisks.

Copy link
Contributor Author

@skonto skonto Aug 10, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it needs the added asterisks: https://github.com/databricks/scala-style-guide
It is standard javadoc

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

* @param masterUrl The url to connect to Mesos master
* @param scheduler the scheduler class to receive scheduler callbacks
* @param sparkUser User to impersonate with when running tasks
Expand Down Expand Up @@ -147,6 +148,20 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}

/**
* Transforms a range resource to a list of ranges
*
* @param res the mesos resource list
* @param name the name of the resource
* @return the list of ranges returned
*/
protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala
.map(r => (r.getBegin, r.getEnd)).toList).toList
}

/**
* Signal that the scheduler has registered with Mesos.
*/
Expand All @@ -172,6 +187,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Partition the existing set of resources into two groups, those remaining to be
* scheduled and those requested to be used for a new task.
*
* @param resources The full list of available resources
* @param resourceName The name of the resource to take from the available resources
* @param amountToUse The amount of resources to take from the available resources
Expand Down Expand Up @@ -223,7 +239,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Converts the attributes from the resource offer into a Map of name -> Attribute Value
* The attribute values are the mesos attribute types and they are
* @param offerAttributes
*
* @param offerAttributes the attributes offered
* @return
*/
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
Expand Down Expand Up @@ -333,6 +350,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Return the amount of memory to allocate to each executor, taking into account
* container overheads.
*
* @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
Expand All @@ -357,6 +375,111 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
}

/**
* Checks executor ports if they are within some range of the offered list of ports ranges,
*
* @param conf the Spark Config
* @param ports the list of ports to check
* @return true if ports are within range false otherwise
*/
protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = {

def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port }
}

val portsToCheck = nonZeroPortValuesFromConfig(conf)
val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
// make sure we have enough ports to allocate per offer
val enoughPorts =
ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size
enoughPorts && withinRange
}

/**
* Partitions port resources.
*
* @param requestedPorts non-zero ports to assign
* @param offeredResources the resources offered
* @return resources left, port resources to be used.
*/
def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource])
: (List[Resource], List[Resource]) = {
if (requestedPorts.isEmpty) {
Copy link

@mgummelt mgummelt Aug 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think saving a few cycles is worth the added complexity of special casing? I don't.

Copy link
Contributor Author

@skonto skonto Aug 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic is different now...It does not work without the shortcut, it is actually if else, I could put the rest of the code in an else block...

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it actually does work. You're throwing away all the port resources by only including the non-port resources in the resources left, but this is safe because no ports are requested.

Regardless, I'm fine with the special case if you want to keep it, but yea let's wrap the rest in an else.

Copy link
Contributor Author

@skonto skonto Aug 9, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha ok... anyway I wrapped it in an else block...

(offeredResources, List[Resource]())
} else {
// partition port offers
val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)

val portsAndRoles = requestedPorts.
map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))

val assignedPortResources = createResourcesFromPorts(portsAndRoles)

// ignore non-assigned port resources, they will be declined implicitly by mesos
// no need for splitting port resources.
(resourcesWithoutPorts, assignedPortResources)
}
}

val managedPortNames = List("spark.executor.port", "spark.blockManager.port")

/**
* The values of the non-zero ports to be used by the executor process.
* @param conf the spark config to use
* @return the ono-zero values of the ports
*/
def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
}

/** Creates a mesos resource for a specific port number. */
private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = {
portsAndRoles.flatMap{ case (port, role) =>
createMesosPortResource(List((port, port)), Some(role))}
}

/** Helper to create mesos resources for specific port ranges. */
private def createMesosPortResource(
ranges: List[(Long, Long)],
role: Option[String] = None): List[Resource] = {
ranges.map { case (rangeStart, rangeEnd) =>
val rangeValue = Value.Range.newBuilder()
.setBegin(rangeStart)
.setEnd(rangeEnd)
val builder = Resource.newBuilder()
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
role.foreach(r => builder.setRole(r))
builder.build()
}
}

/**
* Helper to assign a port to an offered range and get the latter's role
* info to use it later on.
*/
private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource])
: String = {

val ranges = portResources.
map(resource =>
(resource.getRole, resource.getRanges.getRangeList.asScala
.map(r => (r.getBegin, r.getEnd)).toList))

val rangePortRole = ranges
.find { case (role, rangeList) => rangeList
.exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}}
// this is safe since we have previously checked about the ranges (see checkPorts method)
rangePortRole.map{ case (role, rangeList) => role}.get
}

/** Retrieves the port resources from a list of mesos offered resources */
private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = {
resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") }
}

/**
* spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
* submissions with frameworkIDs. However, this causes issues when a driver process launches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.scheduler.cluster.mesos

import java.util.Collections

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
Expand Down Expand Up @@ -212,6 +210,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
.registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
}

test("Port offer decline when there is no appropriate range") {
setBackend(Map("spark.blockManager.port" -> "30100"))
val offeredPorts = (31100L, 31200L)
val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
backend.resourceOffers(driver, List(offer1).asJava)
verify(driver, times(1)).declineOffer(offer1.getId)
}

test("Port offer accepted when ephemeral ports are used") {
setBackend()
val offeredPorts = (31100L, 31200L)
val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
backend.resourceOffers(driver, List(offer1).asJava)
verifyTaskLaunched(driver, "o1")
}

test("Port offer accepted with user defined port numbers") {
val port = 30100
setBackend(Map("spark.blockManager.port" -> s"$port"))
val offeredPorts = (30000L, 31000L)
val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
backend.resourceOffers(driver, List(offer1).asJava)
val taskInfo = verifyTaskLaunched(driver, "o1")

val taskPortResources = taskInfo.head.getResourcesList.asScala.
find(r => r.getType == Value.Type.RANGES && r.getName == "ports")

val isPortInOffer = (r: Resource) => {
r.getRanges().getRangeList
.asScala.exists(range => range.getBegin == port && range.getEnd == port)
}
assert(taskPortResources.exists(isPortInOffer))
}

test("mesos kills an executor when told") {
setBackend()

Expand Down
Loading