Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
logInfo(s"Starting executor with port: ${rpcEnv.address.port.toString}")

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the executor isn't actually started here, can we instead print "setting spark.executor.port to {}"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

}

// Create an instance of the class with the given name, possibly initializing it with our conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ private[spark] class CoarseMesosSchedulerBackend(
.setValue(value)
.build())
}

environment.addVariables(Environment.Variable.newBuilder()
.setName("AVAILABLE_PORTS")
.setValue(getRangeResource(offer.getResourcesList, "ports").mkString(" ")))

val command = CommandInfo.newBuilder()
.setEnvironment(environment)

Expand Down Expand Up @@ -376,8 +381,10 @@ private[spark] class CoarseMesosSchedulerBackend(

val (afterCPUResources, cpuResourcesToUse) =

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you factor this all out into a getResources() method?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

partitionResources(resources, "cpus", taskCPUs)
val (resourcesLeft, memResourcesToUse) =
val (remainingMemResources, memResourcesToUse) =

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/remainingMemResources/afterMemResources to be consistent w/ afterCPUResources

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i will fix that.

partitionResources(afterCPUResources.asJava, "mem", taskMemory)
val (resourcesLeft, portResourcesToUse) = remainingMemResources
.partition {r => ! ( r.getType == Value.Type.RANGES & r.getName == "ports" )}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is still style problems here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes havent finished it.... port resources need to be managed as cpus eg. keep global logistics... but what exactly is the problem for you?


val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
Expand All @@ -386,6 +393,7 @@ private[spark] class CoarseMesosSchedulerBackend(
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
.addAllResources(portResourcesToUse.asJava)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
Expand All @@ -407,13 +415,16 @@ private[spark] class CoarseMesosSchedulerBackend(
val offerCPUs = getResource(resources, "cpus").toInt
val cpus = executorCores(offerCPUs)
val mem = executorMemory(sc)
val ports = getRangeResource(resources, "ports")
val meetsPortRequirements = checkPorts(sc, ports)

cpus > 0 &&
cpus <= offerCPUs &&
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors() < executorLimit &&
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES
slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
meetsPortRequirements
}

private def executorCores(offerCPUs: Int): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,19 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The additions to this file seem overly complex for just grabbing port resources. Maybe all this complexity is necessary, but I'm skeptical. I started in on it and got lost. I don't have time to review it all today. Please ensure there is no more room for simplification.

/** Transforms a range resource to a list of ranges
*
* @param res the mesos resource list
* @param name the name of the resource
* @return the list of ranges returned
*/
protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala
.map(r => (r.getBegin, r.getEnd)).toList).toList
}

/**
* Signal that the scheduler has registered with Mesos.
*/
Expand Down Expand Up @@ -353,4 +366,25 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}

/** Checks executor ports if they are within some range of the offered list of ports ranges.
*
* @param sc the Spark Context
* @param ports the list of ports to check
* @return true if ports are within range false otherwise
*/
protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = {

def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
ps.exists(r => r._1 <= port & r._2 >= port)
}

val portsToCheck = List(sc.conf.getInt("spark.executor.port", 0),
sc.conf.getInt("spark.blockManager.port", 0))
val nonZeroPorts = portsToCheck.filter(_ != 0)
val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))

// make sure we have enough ports to allocate per offer
ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
}

}
150 changes: 112 additions & 38 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1940,57 +1940,131 @@ private[spark] object Utils extends Logging {
}

/**
* Attempt to start a service on the given port, or fail after a number of attempts.
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
*
* @param startPort The initial port to start the service on.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
* @param conf A SparkConf used to get the maximum number of retries when binding to a port.
* @param serviceName Name of the service.
* @return (service: T, port: Int)
*/
* Attempt to start a service on the given port, or fail after a number of attempts.
* Each subsequent attempt uses 1 + the port used in the previous attempt (unless the port is 0).
* It takes into consideration port restrictions through the env var AVAILABLE_PORTS
*
* @param startPort The initial port to start the service on.
* @param startService Function to start service on a given port.
* This is expected to throw java.net.BindException on port collision.
* @param conf A SparkConf used to get the maximum number of retries when binding to a port.
* @param serviceName Name of the service.
* @return (service: T, port: Int)
*/
def startServiceOnPort[T](
startPort: Int,
startService: Int => (T, Int),
conf: SparkConf,
serviceName: String = ""): (T, Int) = {
startPort: Int,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix indentation

startService: Int => (T, Int),
conf: SparkConf,
serviceName: String = ""): (T, Int) = {

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"

// define some helpers, they all share common data, maybe a service abstract class
// for all services could be a good fit here.

def portRangeToList(ranges: String): List[(Long, Long)] = {
ranges.split(" ").map { r => val ret = r.substring(1, r.length - 1).split(",")
(ret(0).toLong, ret(1).toLong)
}.toList
}

def startOnce(tryPort: Int): (Option[T], Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
(Some(service), port)
} catch {
case e: Exception if isBindCollision(e) =>
logWarning(s"Service$serviceString could not bind on port $tryPort. ")
(None, -1)
}
}

def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra space

for (offset <- 0 to maxRetries) {
val tryPort = next(offset)
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logWarning(s"Service$serviceString could not bind on port $tryPort.")
}
}
// Should never happen
throw new SparkException(s"Failed to start service$serviceString on port $startPort")
}

def startFromAvailable(rand: Boolean = false): (T, Int) = {

val ports = portRangeToList(sys.env.get("AVAILABLE_PORTS").get) // checked above for empty

val filteredPorts = ports.map(r => (r._1 to r._2).toList).reduce(_ ++ _).
distinct.filterNot(_ == startPort)
val maxRetries = Math.min(portMaxRetries(conf), filteredPorts.size)

val availPorts = {
if (rand) {
scala.util.Random.shuffle(filteredPorts.take(maxRetries)).toArray
} else {
filteredPorts.sorted.toArray
}
}

val tryPort = (x: Int) => availPorts(x).toInt

retryPort(tryPort, maxRetries)
}

// main port selection logic
require(startPort == 0 || (1024 <= startPort && startPort < 65536),
"startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
if (sys.env.get("AVAILABLE_PORTS").isDefined) { // check if there are deployment restrictions

if (startPort != 0) {

val (service, port) = startOnce(startPort) // try once the provided specific port

if (port != -1) {
(service.get, port) // success
} else {
// try next available port one by one
startFromAvailable()
}
} else {
// try a random port from the available ones
startFromAvailable(true)
}

} else { // no restrictions, handled as usual, exactly the same path as in previous versions

val maxRetries = portMaxRetries(conf)

// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
val tryPort = (x: Int) => if (startPort == 0) {
startPort
} else {
// If the new port wraps around, do not try a privilege port
((startPort + offset - 1024) % (65536 - 1024)) + 1024
}
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logWarning(s"Service$serviceString could not bind on port $tryPort. " +
s"Attempting port ${tryPort + 1}.")
((startPort + x - 1024) % (65536 - 1024)) + 1024
}

retryPort(tryPort, maxRetries)
}
// Should never happen
throw new SparkException(s"Failed to start service$serviceString on port $startPort")
}


/**
* Return whether the exception is caused by an address-port collision when binding.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.Scalar
import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Matchers._
import org.mockito.Mockito._
Expand Down Expand Up @@ -178,12 +178,14 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
test("mesos doesn't register twice with the same shuffle service") {
setBackend(Map("spark.shuffle.service.enabled" -> "true"))
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offeredPorts1 = (31100L, 31200L)
val offeredPorts2 = (41100L, 43200L)

val offer1 = createOffer("o1", "s1", mem, cpu)
val offer1 = createOffer("o1", "s1", mem, cpu, offeredPorts1)
backend.resourceOffers(driver, List(offer1).asJava)
verifyTaskLaunched("o1")

val offer2 = createOffer("o2", "s1", mem, cpu)
val offer2 = createOffer("o2", "s1", mem, cpu, offeredPorts2)
backend.resourceOffers(driver, List(offer2).asJava)
verifyTaskLaunched("o2")

Expand All @@ -197,10 +199,10 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite

test("mesos kills an executor when told") {
setBackend()

val offeredPorts = (31100L, 31200L)
val (mem, cpu) = (backend.executorMemory(sc), 4)

val offer1 = createOffer("o1", "s1", mem, cpu)
val offer1 = createOffer("o1", "s1", mem, cpu, offeredPorts)
backend.resourceOffers(driver, List(offer1).asJava)
verifyTaskLaunched("o1")

Expand All @@ -219,8 +221,9 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
}

private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
val offeredPorts = (31100L, 31200L) // same for all slaves
val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2, offeredPorts)}

backend.resourceOffers(driver, mesosOffers.asJava)
}
Expand Down Expand Up @@ -258,7 +261,8 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
TaskID.newBuilder().setValue(taskId).build()
}

private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int,
ports: (Long, Long)): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
Expand All @@ -268,6 +272,14 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
builder.addResourcesBuilder()
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
.setBegin(ports._1).setEnd(ports._2).build()))
builder.setId(OfferID.newBuilder()
.setValue(offerId).build())

builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
Expand Down