Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
logInfo(s"Starting executor with port: ${rpcEnv.address.port.toString}")
}

// Create an instance of the class with the given name, possibly initializing it with our conf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.executor

import java.nio.ByteBuffer
import java.util
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used?


import scala.collection.JavaConverters._

Expand Down Expand Up @@ -72,6 +73,12 @@ private[spark] class MesosExecutorBackend
Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue))
val conf = new SparkConf(loadDefaults = true).setAll(properties)
val port = conf.getInt("spark.executor.port", 0)
val ranges = getRangesFromResources(executorInfo)

if (ranges.nonEmpty) {
Utils.setPortRangeRestriction(conf, ranges)
}

val env = SparkEnv.createExecutorEnv(
conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false)

Expand All @@ -81,6 +88,16 @@ private[spark] class MesosExecutorBackend
env)
}

private def getRangesFromResources(executorInfo: ExecutorInfo): List[(Long, Long)] = {
val ranges = executorInfo.getResourcesList.asScala
.filter(r => r.getType == Value.Type.RANGES & r.getName == "ports")
.toList.map(r => r.getRanges.getRangeList)

// a final list of all ranges in format (Long , Long)
ranges.flatMap { ranges => ranges.asScala.map { r => (r.getBegin, r.getEnd) }}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to indent this block 2 more spaces

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt be this caught by scalastyle maybe not? i was feeling confident when it passed with zero errors/warnings...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scalastyle is incomplete. I think it gets us 60% of the way there but there are still many things it doesn't catch.

}


override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
val taskId = taskInfo.getTaskId.getValue.toLong
val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.File
import java.util.{ArrayList => JArrayList, Collections, List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.{Scheduler => MScheduler, _}
Expand Down Expand Up @@ -140,8 +141,13 @@ private[spark] class MesosSchedulerBackend(
val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))

val (resourcesAfterPort, usedPortResources) = resourcesAfterMem.partition {
r => ! ( r.getType == Value.Type.RANGES & r.getName == "ports" )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style

val (resourcesAfterPort, usedPortResources) = resourcesAfterMem.partition { r =>
  r.getType == Value.Type.RANGES && r.getName == "ports"
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is easy to miss that :( didnt get any warning or something... my bad..

}

builder.addAllResources(usedCpuResources.asJava)
builder.addAllResources(usedMemResources.asJava)
builder.addAllResources(usedPortResources.asJava)

sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command))

Expand All @@ -155,7 +161,7 @@ private[spark] class MesosSchedulerBackend(
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
}

(executorInfo.build(), resourcesAfterMem.asJava)
(executorInfo.build(), resourcesAfterPort.asJava)
}

/**
Expand All @@ -164,7 +170,7 @@ private[spark] class MesosSchedulerBackend(
*/
private def createExecArg(): Array[Byte] = {
if (execArgs == null) {
val props = new HashMap[String, String]
val props = new mutable.HashMap[String, String]
for ((key, value) <- sc.conf.getAll) {
props(key) = value
}
Expand Down Expand Up @@ -244,20 +250,23 @@ private[spark] class MesosSchedulerBackend(
val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o =>
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val ports = getRangeResource(o.getResourcesList, "ports")
val slaveId = o.getSlaveId.getValue
val offerAttributes = toAttributeMap(o.getAttributesList)

// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
// 3. ports
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsPortRequirements = checkPorts(sc, ports)
val meetsRequirements =
(meetsMemoryRequirements && meetsCPURequirements) ||
(meetsMemoryRequirements && meetsCPURequirements && meetsPortRequirements) ||
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: "
+ s"$offerAttributes mem: $mem cpu: $cpus")
+ s"$offerAttributes mem: $mem cpu: $cpus ports: ${ports.mkString(",")}")

meetsRequirements
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}

protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala
.map(r => (r.getBegin, r.getEnd)).toList).toList
}

protected def markRegistered(): Unit = {
registerLatch.countDown()
}
Expand Down Expand Up @@ -340,4 +347,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
}

protected def getExecutorRelatedPort(sc: SparkContext, configProperty: String): Int = {
// either: spark.blockManager.port or spark.executor.port
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is a great abstraction; you're passing the whole sc in here but only to get its conf. The caller can just get the port themselves easily.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes my bad...

sc.conf.getInt(s"$configProperty", 0)
}

protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to document what these tuples mean. This method needs a java doc

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok comment is needed at least there... missed it.. On the other hand, judging from pre-existing methods not all of them have a proper javadoc with parameters etc so it seemed to me a bit ad hoc the whole documenting process. I am confused.
Compare def startServiceOnPort doc versus any other method in Utils. Also i dont see much in the scala style doc. Maybe i am wrong and the idea is keep it simple and add full comments only when things are complex...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, the existing code is not perfect, especially Mesos integration code which really lacks documentation. In general I would err on adding more comments than not adding them at all, since the goal of adding comments is to make it easy for someone new to follow. Even just a sentence on what the parameters and return value mean is better than nothing.


def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = {
ps.exists(r => r._1 <= port & r._2 >= port)
}

val portsToCheck = List(getExecutorRelatedPort(sc, "spark.executor.port"),
getExecutorRelatedPort(sc, "spark.blockManager.port"))
val nonZeroPorts = portsToCheck.filter(_ != 0)
val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports))

// make sure we have enough ports to allocate per offer
ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange
}
}

147 changes: 123 additions & 24 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1967,39 +1967,138 @@ private[spark] object Utils extends Logging {
conf: SparkConf,
serviceName: String = ""): (T, Int) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is now very large and difficult to follow. Is all the logic required? If so we should make smaller helper methods with good documentation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, If i didnt do it this way then this method would have been similar but not the same with the pre-existing method in that file. Then it would have been be a problem of redundancy. The logic is fairly complex and it is the minimum i could use to cover all cases.


val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"

def portRangeToList(ranges: String): List[(Long, Long)] = {
ranges.split(":").map { r => val ret = r.substring(1, r.length - 1).split(",")
(ret(0).toLong, ret(1).toLong)
}.toList
}

def startOnce(tryPort: Int): (Option[T], Int) = {
val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
(Some(service), port)
} catch {
case e: Exception if isBindCollision(e) =>
logWarning(s"Service$serviceString could not bind on port $tryPort. ")
(None, -1)
}
}

def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = {

for (offset <- 0 to maxRetries) {
val tryPort = next(offset)
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logWarning(s"Service$serviceString could not bind on port $tryPort.")
}
}
// Should never happen
throw new SparkException(s"Failed to start service$serviceString on port $startPort")
}

def startFromAvailable(rand: Boolean = false): (T, Int) = {

val ports = portRangeToList(getPortRangeRestriction(conf).get) // checked above for empty

val filteredPorts = ports.map(r => (r._1 to r._2).toList).reduce(_ ++ _).
distinct.filterNot(_ == startPort)
val maxRetries = Math.min(portMaxRetries(conf), filteredPorts.size)

val availPorts = {
if (rand) {
scala.util.Random.shuffle(filteredPorts.take(maxRetries)).toArray
} else {
filteredPorts.sorted.toArray
}
}

val tryPort = (x: Int) => availPorts(x).toInt

retryPort(tryPort, maxRetries)
}

require(startPort == 0 || (1024 <= startPort && startPort < 65536),
"startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.")

val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'"
val maxRetries = portMaxRetries(conf)
for (offset <- 0 to maxRetries) {
if (hasPortRangeRestriction(conf)) {

if (startPort != 0) {

val (service, port) = startOnce(startPort)

if (port != -1) {
(service.get, port)
} else {
// try other
startFromAvailable()
}
} else {
// try random
startFromAvailable(true)
}

} else {

val maxRetries = portMaxRetries(conf)

// Do not increment port if startPort is 0, which is treated as a special port
val tryPort = if (startPort == 0) {
val tryPort = (x: Int) => if (startPort == 0) {
startPort
} else {
// If the new port wraps around, do not try a privilege port
((startPort + offset - 1024) % (65536 - 1024)) + 1024
}
try {
val (service, port) = startService(tryPort)
logInfo(s"Successfully started service$serviceString on port $port.")
return (service, port)
} catch {
case e: Exception if isBindCollision(e) =>
if (offset >= maxRetries) {
val exceptionMessage =
s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!"
val exception = new BindException(exceptionMessage)
// restore original stack trace
exception.setStackTrace(e.getStackTrace)
throw exception
}
logWarning(s"Service$serviceString could not bind on port $tryPort. " +
s"Attempting port ${tryPort + 1}.")
((startPort + x - 1024) % (65536 - 1024)) + 1024
}

retryPort(tryPort, maxRetries)
}
// Should never happen
throw new SparkException(s"Failed to start service$serviceString on port $startPort")
}

/**
* Checks if there is any port restriction (currently used only with mesos)
* @param conf the SparkConfig to use
* @return true if port range restriction holds false otherwise
*/
def hasPortRangeRestriction(conf: SparkConf): Boolean = {
// Caution: the property is for internal use only
conf.getOption("spark.mesos.executor.port.ranges").isDefined
}

/**
* Gets the allowed port ranges to use (currently used only with mesos)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if these are only used in Mesos they don't belong here. We should put them in MesosSchedulerUtils or something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The port restriction idea could be useful in other environments for ops use cases, the reason i put it there is because another method in Utils use it so they are at the same layer i shouldnt call mesos specific methods from a generic method for starting a service...
In other words, depending on the environment, only mesos though for now, we need to make startService code port restriction aware or move that logic one layer up. In the latter case a larger refactoring was needed.

* @param conf the SparkConfig to use
* @return the string containing a list of port ranges in the format:
* (begin_port_number_1, end_port_number_1):...:(begin_port_number_n, end_port_number_n)
*/
def getPortRangeRestriction(conf: SparkConf): Option[String] = {
// Caution: the property is for internal use only
conf.getOption("spark.mesos.executor.port.ranges")
}

/**
* Sets the allowed port ranges to use (currently used only with mesos).
* @param conf the SparkConfig to use
* @param ports a list of port ranges with format: (begin_port_number, end_port_number)
*/
def setPortRangeRestriction(conf: SparkConf, ports: List[(Long, Long)]): Unit = {
// Caution: the property is for internal use only
conf.set("spark.mesos.executor.port.ranges", ports.mkString(":"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of an anti-pattern; conf is supposed to be immutable once we create the SparkContext. If we set things after that some things might get the new value but others might not. It's a confusing pattern that we try to shy away from in general.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok i agree but it seemed the only way. A proper abstraction for port related restriction would require more refactoring that is why i followed this approach anyway.

}

/**
Expand Down
Loading