From aa6698c4ebee6e593cc9aa075993b490af80ca02 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Wed, 6 Jan 2016 02:47:50 +0100 Subject: [PATCH 1/2] honour ports --- .../scala/org/apache/spark/SparkEnv.scala | 11 +- .../spark/executor/MesosExecutorBackend.scala | 19 +++ .../cluster/mesos/MesosSchedulerBackend.scala | 17 +- .../cluster/mesos/MesosSchedulerUtils.scala | 27 ++++ .../scala/org/apache/spark/util/Utils.scala | 148 +++++++++++++++--- .../mesos/MesosSchedulerBackendSuite.scala | 31 +++- 6 files changed, 218 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index b98cc964eda87..9755846159080 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -254,8 +254,16 @@ object SparkEnv extends Logging { if (port == 0 || rpcEnv.address == null) { port } else { - rpcEnv.address.port + 1 + + if (Utils.hasPortRangeRestriction(conf)) { + 0 // pick one form the range + } else { + rpcEnv.address.port + 1 + } } + + logInfo(s"port for actorsystem: ${actorSystemPort}") + // Create a ActorSystem for legacy codes AkkaUtils.createActorSystem( actorSystemName + "ActorSystem", @@ -273,6 +281,7 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", rpcEnv.address.port.toString) } else if (rpcEnv.address != null) { conf.set("spark.executor.port", rpcEnv.address.port.toString) + logInfo(s"Starting executor with port: ${rpcEnv.address.port.toString}") } // Create an instance of the class with the given name, possibly initializing it with our conf diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index d85465eb25683..c0a40470e521c 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -18,6 +18,7 @@ package org.apache.spark.executor import java.nio.ByteBuffer +import java.util import scala.collection.JavaConverters._ @@ -31,6 +32,8 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData import org.apache.spark.util.Utils + + private[spark] class MesosExecutorBackend extends MesosExecutor with ExecutorBackend @@ -72,6 +75,12 @@ private[spark] class MesosExecutorBackend Seq[(String, String)](("spark.app.id", frameworkInfo.getId.getValue)) val conf = new SparkConf(loadDefaults = true).setAll(properties) val port = conf.getInt("spark.executor.port", 0) + val ranges = getRangesFromResources(executorInfo) + + if (ranges.nonEmpty) { + Utils.setPortRangeRestriction(conf, ranges) + } + val env = SparkEnv.createExecutorEnv( conf, executorId, slaveInfo.getHostname, port, cpusPerTask, isLocal = false) @@ -81,6 +90,16 @@ private[spark] class MesosExecutorBackend env) } + private def getRangesFromResources(executorInfo: ExecutorInfo): List[(Long, Long)] = { + val ranges = executorInfo.getResourcesList.asScala + .filter(r => r.getType == Value.Type.RANGES & r.getName == "ports") + .toList.map(r => r.getRanges.getRangeList) + + // a final list of all ranges in format (Long , Long) + ranges.flatMap { ranges => ranges.asScala.map { r => (r.getBegin, r.getEnd) }} + } + + override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { val taskId = taskInfo.getTaskId.getValue.toLong val taskData = MesosTaskLaunchData.fromByteString(taskInfo.getData) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 281965a5981bb..48efd9675d64f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -21,6 +21,7 @@ import java.io.File import java.util.{ArrayList => JArrayList, Collections, List => JList} import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.{Scheduler => MScheduler, _} @@ -139,8 +140,13 @@ private[spark] class MesosSchedulerBackend( val (resourcesAfterMem, usedMemResources) = partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc)) + val (resourcesAfterPort, usedPortResources) = resourcesAfterMem.partition { + r => ! ( r.getType == Value.Type.RANGES & r.getName == "ports" ) + } + builder.addAllResources(usedCpuResources.asJava) builder.addAllResources(usedMemResources.asJava) + builder.addAllResources(usedPortResources.asJava) sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) @@ -154,7 +160,7 @@ private[spark] class MesosSchedulerBackend( .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) } - (executorInfo.build(), resourcesAfterMem.asJava) + (executorInfo.build(), resourcesAfterPort.asJava) } /** @@ -163,7 +169,7 @@ private[spark] class MesosSchedulerBackend( */ private def createExecArg(): Array[Byte] = { if (execArgs == null) { - val props = new HashMap[String, String] + val props = new mutable.HashMap[String, String] for ((key, value) <- sc.conf.getAll) { props(key) = value } @@ -243,20 +249,23 @@ private[spark] class MesosSchedulerBackend( val (usableOffers, unUsableOffers) = offersMatchingConstraints.partition { o => val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") + val ports = getRangeResource(o.getResourcesList, "ports") val slaveId = o.getSlaveId.getValue val offerAttributes = toAttributeMap(o.getAttributesList) // check offers for // 1. Memory requirements // 2. CPU requirements - need at least 1 for executor, 1 for task + // 3. ports val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) + val meetsPortRequirements = checkPorts(sc, ports) val meetsRequirements = - (meetsMemoryRequirements && meetsCPURequirements) || + (meetsMemoryRequirements && meetsCPURequirements && meetsPortRequirements) || (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) val debugstr = if (meetsRequirements) "Accepting" else "Declining" logDebug(s"$debugstr offer: ${o.getId.getValue} with attributes: " - + s"$offerAttributes mem: $mem cpu: $cpus") + + s"$offerAttributes mem: $mem cpu: $cpus ports: ${ports.mkString(",")}") meetsRequirements } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 721861fbbc517..d9dc052be04b1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -140,6 +140,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging { res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum } + protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = { + // A resource can have multiple values in the offer since it can either be from + // a specific role or wildcard. + res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala + .map(r => (r.getBegin, r.getEnd)).toList).toList + } + protected def markRegistered(): Unit = { registerLatch.countDown() } @@ -340,4 +347,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") } + protected def getExecutorRelatedPort(sc: SparkContext, configProperty: String): Int = { + // either: spark.blockManager.port or spark.executor.port + sc.conf.getInt(s"$configProperty", 0) + } + + protected def checkPorts(sc: SparkContext, ports: List[(Long, Long)]): Boolean = { + + def checkIfInRange(port: Int, ps: List[(Long, Long)]): Boolean = { + ps.exists(r => r._1 <= port & r._2 >= port) + } + + val portsToCheck = List(getExecutorRelatedPort(sc, "spark.executor.port"), + getExecutorRelatedPort(sc, "spark.blockManager.port")) + val nonZeroPorts = portsToCheck.filter(_ != 0) + val withinRange = nonZeroPorts.forall(p => checkIfInRange(p, ports)) + + // make sure we have enough ports to allocate per offer + ports.map(r => r._2 - r._1 + 1).sum >= portsToCheck.size && withinRange + } } + diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b8ca6b07e4198..37715fd998a20 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1950,39 +1950,139 @@ private[spark] object Utils extends Logging { conf: SparkConf, serviceName: String = ""): (T, Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + + def portRangeToList(ranges: String): List[(Long, Long)] = { + ranges.split(":").map { r => val ret = r.substring(1, r.length - 1).split(",") + (ret(0).toLong, ret(1).toLong) + }.toList + } + + def startOnce(tryPort: Int): (Option[T], Int) = { + val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" + try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + (Some(service), port) + } catch { + case e: Exception if isBindCollision(e) => + logWarning(s"Service$serviceString could not bind on port $tryPort. ") + (None, -1) + } + } + + def retryPort(next: Int => Int, maxRetries: Int): (T, Int) = { + + for (offset <- 0 to maxRetries) { + val tryPort = next(offset) + try { + val (service, port) = startService(tryPort) + logInfo(s"Successfully started service$serviceString on port $port.") + return (service, port) + } catch { + case e: Exception if isBindCollision(e) => + if (offset >= maxRetries) { + val exceptionMessage = + s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" + val exception = new BindException(exceptionMessage) + // restore original stack trace + exception.setStackTrace(e.getStackTrace) + throw exception + } + logWarning(s"Service$serviceString could not bind on port $tryPort.") + } + } + // Should never happen + throw new SparkException(s"Failed to start service$serviceString on port $startPort") + } + + def startFromAvailable(rand: Boolean = false): (T, Int) = { + + val ports = portRangeToList(getPortRangeRestriction(conf).get) // checked above for empty + + val filteredPorts = ports.map(r => (r._1 to r._2).toList).reduce(_ ++ _). + distinct.filterNot(_ == startPort) + val maxRetries = Math.min(portMaxRetries(conf), filteredPorts.size) + + val availPorts = { + if (rand) { + scala.util.Random.shuffle(filteredPorts.take(maxRetries)).toArray + } else { + filteredPorts.sorted.toArray + } + } + + val tryPort = (x: Int) => availPorts(x).toInt + + retryPort(tryPort, maxRetries) + } + require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") - val serviceString = if (serviceName.isEmpty) "" else s" '$serviceName'" - val maxRetries = portMaxRetries(conf) - for (offset <- 0 to maxRetries) { + if (hasPortRangeRestriction(conf)) { + + if (startPort != 0) { + + val (service, port) = startOnce(startPort) + + if (port != -1) { + (service.get, port) + } else { + // try other + startFromAvailable() + } + } + else { + // try random + startFromAvailable(true) + } + + } else { + + val maxRetries = portMaxRetries(conf) + // Do not increment port if startPort is 0, which is treated as a special port - val tryPort = if (startPort == 0) { + val tryPort = (x: Int) => if (startPort == 0) { startPort } else { // If the new port wraps around, do not try a privilege port - ((startPort + offset - 1024) % (65536 - 1024)) + 1024 - } - try { - val (service, port) = startService(tryPort) - logInfo(s"Successfully started service$serviceString on port $port.") - return (service, port) - } catch { - case e: Exception if isBindCollision(e) => - if (offset >= maxRetries) { - val exceptionMessage = - s"${e.getMessage}: Service$serviceString failed after $maxRetries retries!" - val exception = new BindException(exceptionMessage) - // restore original stack trace - exception.setStackTrace(e.getStackTrace) - throw exception - } - logWarning(s"Service$serviceString could not bind on port $tryPort. " + - s"Attempting port ${tryPort + 1}.") + ((startPort + x - 1024) % (65536 - 1024)) + 1024 } + + retryPort(tryPort, maxRetries) } - // Should never happen - throw new SparkException(s"Failed to start service$serviceString on port $startPort") + } + + /** + * Checks if there is any port restriction (currently used only with mesos) + * @param conf the SparkConfig to use + * @return true if port range restriction holds false otherwise + */ + def hasPortRangeRestriction(conf: SparkConf): Boolean = { + // Caution: the property is for internal use only + conf.getOption("spark.mesos.executor.port.ranges").isDefined + } + + /** + * Gets the allowed port ranges to use (currently used only with mesos) + * @param conf the SparkConfig to use + * @return the string containing a list of port ranges in the format: + * (begin_port_number_1, end_port_number_1):...:(begin_port_number_n, end_port_number_n) + */ + def getPortRangeRestriction(conf: SparkConf): Option[String] = { + // Caution: the property is for internal use only + conf.getOption("spark.mesos.executor.port.ranges") + } + + /** + * Sets the allowed port ranges to use (currently used only with mesos). + * @param conf the SparkConfig to use + * @param ports a list of port ranges with format: (begin_port_number, end_port_number) + */ + def setPortRangeRestriction(conf: SparkConf, ports: List[(Long, Long)]): Unit = { + // Caution: the property is for internal use only + conf.set("spark.mesos.executor.port.ranges", ports.mkString(":")) } /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index c4dc560031207..e0b4e6e76d331 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.Value.{Ranges, Scalar, Range => MesosRange} import org.apache.mesos.Protos._ import org.apache.mesos.SchedulerDriver import org.mockito.Matchers._ @@ -157,7 +157,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi } test("mesos resource offers result in launching tasks") { - def createOffer(id: Int, mem: Int, cpu: Int): Offer = { + def createOffer(id: Int, mem: Int, cpu: Int, ports: (Long, Long)): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -167,6 +167,10 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.addResourcesBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder().setBegin(ports._1).setEnd(ports._2).build())) builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) @@ -191,11 +195,12 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val minMem = backend.calculateTotalMemory(sc) val minCpu = 4 + val offeredPorts = (31100L, 31200L) val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem, minCpu)) - mesosOffers.add(createOffer(2, minMem - 1, minCpu)) - mesosOffers.add(createOffer(3, minMem, minCpu)) + mesosOffers.add(createOffer(1, minMem, minCpu, offeredPorts)) + mesosOffers.add(createOffer(2, minMem - 1, minCpu, offeredPorts)) + mesosOffers.add(createOffer(3, minMem, minCpu, offeredPorts)) val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) expectedWorkerOffers.append(new WorkerOffer( @@ -242,7 +247,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi // Unwanted resources offered on an existing node. Make sure they are declined val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(1, minMem, minCpu)) + mesosOffers2.add(createOffer(1, minMem, minCpu, offeredPorts)) reset(taskScheduler) reset(driver) when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) @@ -269,6 +274,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(sc.listenerBus).thenReturn(listenerBus) val id = 1 + val (devPortBegin, devPortEnd) = (40000 , 40100) + val (prodPortBegin, prodPortEnd) = (30000, 30100) val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -280,6 +287,12 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi .setRole("prod") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(1)) + builder.addResourcesBuilder() + .setName("ports") + .setRole("prod") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange( + MesosRange.newBuilder().setBegin(prodPortBegin).setEnd(prodPortEnd).build())) builder.addResourcesBuilder() .setName("mem") .setRole("dev") @@ -290,6 +303,12 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi .setRole("dev") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(2)) + builder.addResourcesBuilder() + .setName("ports") + .setRole("dev") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange( + MesosRange.newBuilder().setBegin(devPortBegin).setEnd(devPortEnd).build())) val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) From c4a5115ea4bf3d322bd38a95cc497cc07c97c001 Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Mon, 18 Jan 2016 18:34:31 +0100 Subject: [PATCH 2/2] fix style --- .../org/apache/spark/executor/MesosExecutorBackend.scala | 2 -- core/src/main/scala/org/apache/spark/util/Utils.scala | 3 +-- .../cluster/mesos/MesosSchedulerBackendSuite.scala | 8 ++++---- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index faf9517f10099..861930192b365 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -32,8 +32,6 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData import org.apache.spark.util.Utils - - private[spark] class MesosExecutorBackend extends MesosExecutor with ExecutorBackend diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 566acd3546bd7..72cc942402134 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2049,8 +2049,7 @@ private[spark] object Utils extends Logging { // try other startFromAvailable() } - } - else { + } else { // try random startFromAvailable(true) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index 2a656a877d695..a2888da703aab 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -26,9 +26,8 @@ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import org.apache.mesos.Protos.Value.{Ranges, Scalar, Range => MesosRange} import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Matchers._ @@ -171,7 +170,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi builder.addResourcesBuilder() .setName("ports") .setType(Value.Type.RANGES) - .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder().setBegin(ports._1).setEnd(ports._2).build())) + .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() + .setBegin(ports._1).setEnd(ports._2).build())) builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) @@ -275,7 +275,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(sc.listenerBus).thenReturn(listenerBus) val id = 1 - val (devPortBegin, devPortEnd) = (40000 , 40100) + val (devPortBegin, devPortEnd) = (40000, 40100) val (prodPortBegin, prodPortEnd) = (30000, 30100) val builder = Offer.newBuilder() builder.addResourcesBuilder()