diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 12c7b2048a8c8..473a070534d12 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -244,6 +244,7 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", rpcEnv.address.port.toString) } else if (rpcEnv.address != null) { conf.set("spark.executor.port", rpcEnv.address.port.toString) + logInfo(s"Starting executor with port: ${rpcEnv.address.port.toString}") } // Create an instance of the class with the given name, possibly initializing it with our conf diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index cfd9bcd65c566..861930192b365 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.executor import java.nio.ByteBuffer +import java.util import scala.collection.JavaConverters._ @@ -72,6 +73,12 @@ private[spark] class MesosExecutorBackend Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) val conf = new SparkConf(loadDefaults = true).setAll(properties) val port = conf.getInt("spark.executor.port", 0) + val ranges = getRangesFromResources(executorInfo) + + if (ranges.nonEmpty) { + Utils.setPortRangeRestriction(conf, ranges) + } + val env = SparkEnv.createExecutorEnv( conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) @@ -81,6 +88,16 @@ private[spark] class MesosExecutorBackend env) } + private def getRangesFromResources(executorInfo: ExecutorInfo): List[(Long, Long)] = { + val ranges = executorInfo.getResourcesList.asScala + .filter(r => r.getType == Value.Type.RANGES & r.getName == "ports") + .toList.map(r => r.getRanges.getRangeList) + + // a final list of all ranges in format (Long , Long) + ranges.flatMap { ranges => ranges.asScala.map { r => (r.getBegin, r.getEnd) }} + } + + override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { val taskId = taskInfo.getTaskId.getValue.toLong val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index eaf0cb06d6c73..7ef3ee7389c2b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -21,6 +21,7 @@ import java.io.File import java.util.{ArrayList => JArrayList, Collections, List => JList} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.{Scheduler => MScheduler, _} @@ -140,8 +141,13 @@ private[spark] class MesosSchedulerBackend( val (resourcesAfterMem, usedMemResources) = partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc)) + val (resourcesAfterPort, usedPortResources) = resourcesAfterMem.partition { + r => ! ( r.getType == Value.Type.RANGES & r.getName == "ports" ) + } + builder.addAllResources(usedCpuResources.asJava) builder.addAllResources(usedMemResources.asJava) + builder.addAllResources(usedPortResources.asJava) sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) @@ -155,7 +161,7 @@ private[spark] class MesosSchedulerBackend( .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) } - (executorInfo.build(), resourcesAfterMem.asJava) + (executorInfo.build(), resourcesAfterPort.asJava) } /** @@ -164,7 +170,7 @@ private[spark] class MesosSchedulerBackend( */ private def createExecArg(): Array[Byte] = { if (execArgs == null) { - val props = new HashMap[String, String] + val props = new mutable.HashMap[String, String] for ((key, value) <- sc.conf.getAll) { props(key) = value } @@ -244,20 +250,23 @@ private[spark] class MesosSchedulerBackend( val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o => val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") + val ports = getRangeResource(o.getResourcesList, "ports") val slaveId = o.getSlaveId.getValue val offerAttributes = toAttributeMap(o.getAttributesList) // check offers for // 1. Memory requirements // 2. CPU requirements - need at least 1 for executor, 1 for task + // 3. ports val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) + val meetsPortRequirements = checkPorts(sc, ports) val meetsRequirements = - (meetsMemoryRequirements && meetsCPURequirements) || + (meetsMemoryRequirements && meetsCPURequirements && meetsPortRequirements) || (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) val debugstr = if (meetsRequirements) "Accepting" else "Declining" logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " - + s"$offerAttributes mem: $mem cpu: $cpus") + + s"$offerAttributes mem: $mem cpu: $cpus ports: ${ports.mkString(",")}") meetsRequirements } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 010caff3e39b2..5be0fb2ee33b9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -140,6 +140,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging { res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum } + protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = { + // A resource can have multiple values in the offer since it can either be from + // a specific role or wildcard. + res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala + .map(r => (r.getBegin, r.getEnd)).toList).toList + } + protected def markRegistered(): Unit = { registerLatch.countDown() } @@ -340,4 +347,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + protected def getExecutorRelatedPort(sc: SparkContext, configProperty: String): Int = { + // either: spark.blockManager.port or spark.executor.port + sc.conf.getInt(s"$configProperty", 0) + } + + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + + def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) + } + + val portsToCheck = List(getExecutorRelatedPort(sc, "spark.executor.port"), + getExecutorRelatedPort(sc, "spark.blockManager.port")) + val nonZeroPorts = portsToCheck.filter(_ != 0) + val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + + // make sure we have enough ports to allocate per offer + ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } } + diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9ecbffbf715c5..72cc942402134 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1967,39 +1967,138 @@ private[spark] object Utils extends Logging { conf: SparkConf, serviceName: String = ""): (T, Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + + def portRangeToList(ranges: String): List[(Long, Long)] = { + ranges.split(":").map { r => val ret = r.substring(1, r.length - 1).split(",") + (ret(0).toLong, ret(1).toLong) + }.toList + } + + def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + (Some(service), port) + } catch { + case e: Exception if isBindCollision(e) => + logWarning(s"Service$serviceString could not bind on port $tryPort. ") + (None, -1) + } + } + + def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 to maxRetries) { + val tryPort = next(offset) + try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) + } catch { + case e: Exception if isBindCollision(e) => + if (offset >= maxRetries) { + val exceptionMessage = + s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception + } + logWarning(s"Service$serviceString could not bind on port $tryPort.") + } + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") + } + + def startFromAvailable(rand: Boolean = false): (T, Int) = { + + val ports = portRangeToList(getPortRangeRestriction(conf).get) // checked above for empty + + val filteredPorts = ports.map(r => (r._1 to r._2).toList).reduce(_ ++ _). + distinct.filterNot(_ == startPort) + val maxRetries = Math.min(portMaxRetries(conf), filteredPorts.size) + + val availPorts = { + if (rand) { + scala.util.Random.shuffle(filteredPorts.take(maxRetries)).toArray + } else { + filteredPorts.sorted.toArray + } + } + + val tryPort = (x: Int) => availPorts(x).toInt + + retryPort(tryPort, maxRetries) + } + require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") - val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" - val maxRetries = portMaxRetries(conf) - for (offset <- 0 to maxRetries) { + if (hasPortRangeRestriction(conf)) { + + if (startPort != 0) { + + val (service, port) = startOnce(startPort) + + if (port != -1) { + (service.get, port) + } else { + // try other + startFromAvailable() + } + } else { + // try random + startFromAvailable(true) + } + + } else { + + val maxRetries = portMaxRetries(conf) + // Do not increment port if startPort is 0, which is treated as a special port - val tryPort = if (startPort == 0) { + val tryPort = (x: Int) => if (startPort == 0) { startPort } else { // If the new port wraps around, do not try a privilege port - ((startPort + offset - 1024) % (65536 - 1024)) + 1024 - } - try { - val (service, port) = startService(tryPort) - logInfo(s"Successfully started service$serviceString on port $port.") - return (service, port) - } catch { - case e: Exception if isBindCollision(e) => - if (offset >= maxRetries) { - val exceptionMessage = - s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" - val exception = new BindException(exceptionMessage) - // restore original stack trace - exception.setStackTrace(e.getStackTrace) - throw exception - } - logWarning(s"Service$serviceString could not bind on port $tryPort. " + - s"Attempting port ${tryPort + 1}.") + ((startPort + x - 1024) % (65536 - 1024)) + 1024 } + + retryPort(tryPort, maxRetries) } - // Should never happen - throw new SparkException(s"Failed to start service$serviceString on port $startPort") + } + + /** + * Checks if there is any port restriction (currently used only with mesos) + * @param conf the SparkConfig to use + * @return true if port range restriction holds false otherwise + */ + def hasPortRangeRestriction(conf: SparkConf): Boolean = { + // Caution: the property is for internal use only + conf.getOption("spark.mesos.executor.port.ranges").isDefined + } + + /** + * Gets the allowed port ranges to use (currently used only with mesos) + * @param conf the SparkConfig to use + * @return the string containing a list of port ranges in the format: + * (begin_port_number_1, end_port_number_1):...:(begin_port_number_n, end_port_number_n) + */ + def getPortRangeRestriction(conf: SparkConf): Option[String] = { + // Caution: the property is for internal use only + conf.getOption("spark.mesos.executor.port.ranges") + } + + /** + * Sets the allowed port ranges to use (currently used only with mesos). + * @param conf the SparkConfig to use + * @param ports a list of port ranges with format: (begin_port_number, end_port_number) + */ + def setPortRangeRestriction(conf: SparkConf, ports: List[(Long, Long)]): Unit = { + // Caution: the property is for internal use only + conf.set("spark.mesos.executor.port.ranges", ports.mkString(":")) } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index e111e2e9f6163..a2888da703aab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -27,7 +27,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Matchers._ @@ -157,7 +157,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi } test("mesos resource offers result in launching tasks") { - def createOffer(id: Int, mem: Int, cpu: Int): Offer = { + def createOffer(id: Int, mem: Int, cpu: Int, ports: (Long, Long)): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -167,6 +167,11 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.addResourcesBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() + .setBegin(ports._1).setEnd(ports._2).build())) builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) @@ -191,11 +196,12 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val minMem = backend.calculateTotalMemory(sc) val minCpu = 4 + val offeredPorts = (31100L, 31200L) val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem, minCpu)) - mesosOffers.add(createOffer(2, minMem - 1, minCpu)) - mesosOffers.add(createOffer(3, minMem, minCpu)) + mesosOffers.add(createOffer(1, minMem, minCpu, offeredPorts)) + mesosOffers.add(createOffer(2, minMem - 1, minCpu, offeredPorts)) + mesosOffers.add(createOffer(3, minMem, minCpu, offeredPorts)) val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) expectedWorkerOffers.append(new WorkerOffer( @@ -242,7 +248,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi // Unwanted resources offered on an existing node. Make sure they are declined val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(1, minMem, minCpu)) + mesosOffers2.add(createOffer(1, minMem, minCpu, offeredPorts)) reset(taskScheduler) reset(driver) when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) @@ -269,6 +275,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(sc.listenerBus).thenReturn(listenerBus) val id = 1 + val (devPortBegin, devPortEnd) = (40000, 40100) + val (prodPortBegin, prodPortEnd) = (30000, 30100) val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -280,6 +288,12 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi .setRole("prod") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(1)) + builder.addResourcesBuilder() + .setName("ports") + .setRole("prod") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange( + MesosRange.newBuilder().setBegin(prodPortBegin).setEnd(prodPortEnd).build())) builder.addResourcesBuilder() .setName("mem") .setRole("dev") @@ -290,6 +304,12 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi .setRole("dev") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(2)) + builder.addResourcesBuilder() + .setName("ports") + .setRole("dev") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange( + MesosRange.newBuilder().setBegin(devPortBegin).setEnd(devPortEnd).build())) val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))