From 92b47fdabb1022e25936101b162387cbda9495d3 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Fri, 15 May 2015 10:54:07 -0700 Subject: [PATCH 01/24] Add attributes based constraints support to MesosScheduler --- .../mesos/CoarseMesosSchedulerBackend.scala | 9 +- .../cluster/mesos/MesosSchedulerBackend.scala | 7 +- .../cluster/mesos/MesosSchedulerUtils.scala | 103 +++++++++++++++++- 3 files changed, 111 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 6b8edca5aa485..4aefbf2e113e8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{Collections, List => JList} -import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -66,6 +65,9 @@ private[spark] class CoarseMesosSchedulerBackend( val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) + // Offer constraints + val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + var nextMesosTaskId = 0 @volatile var appId: String = _ @@ -171,7 +173,10 @@ private[spark] class CoarseMesosSchedulerBackend( synchronized { val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers) { + // filter out all the offers that do not meet constraints (if specified) + val qualifyingOffers = filterOffersByConstraints(offers, slaveOfferConstraints) + + for (offer <- qualifyingOffers) { val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 49de85ef48ada..c8ce1476ab7d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -20,7 +20,6 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{ArrayList => JArrayList, Collections, List => JList} -import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} @@ -59,6 +58,9 @@ private[spark] class MesosSchedulerBackend( private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) + // Offer constraints + val offerConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + @volatile var appId: String = _ override def start() { @@ -186,8 +188,9 @@ private[spark] class MesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { + val qualifyingOffers = filterOffersByConstraints(offers, offerConstraints) // Fail-fast on offers we know will be rejected - val (usableOffers, unUsableOffers) = offers.partition { o => + val (usableOffers, unUsableOffers) = qualifyingOffers.partition { o => val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index d11228f3d016a..3f424189844ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -17,16 +17,20 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util.List +import java.util.{List => JList} import java.util.concurrent.CountDownLatch +import com.google.common.base.Splitter + import scala.collection.JavaConversions._ +import scala.collection.mutable -import org.apache.mesos.Protos.{FrameworkInfo, Resource, Status} -import org.apache.mesos.{MesosSchedulerDriver, Scheduler} +import org.apache.mesos.Protos._ +import org.apache.mesos.{Protos, MesosSchedulerDriver, Scheduler} import org.apache.spark.Logging import org.apache.spark.util.Utils + /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper * methods and Mesos scheduler will use. @@ -86,10 +90,101 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Get the amount of resources for the specified type from the resource list */ - protected def getResource(res: List[Resource], name: String): Double = { + protected def getResource(res: JList[Resource], name: String): Double = { for (r <- res if r.getName == name) { return r.getScalar.getValue } 0.0 } + + + /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ + private[mesos] def getAttribute(attr: Attribute): (String, Set[String]) = + (attr.getName, attr.getText.getValue.split(',').toSet) + + + /** Build a Mesos resource protobuf object */ + private[mesos] def createResource(resourceName: String, quantity: Double): Protos.Resource = { + Resource.newBuilder() + .setName(resourceName) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) + .build() + } + + + /** + * Match the requirements (if any) to the offer attributes. + * if attribute requirements are not specified - return true + * else if attribute is defined and no values are given, simple attribute presence is preformed + * else if attribute name and value is specified, subset match is performed on slave attributes + */ + private[mesos] def matchesAttributeRequirements( + slaveOfferConstraints: Map[String, Set[String]], + offerAttributes: Map[String, Set[String]]): Boolean = + if (slaveOfferConstraints.isEmpty) { + true + } else { + slaveOfferConstraints.forall { + // offer has the required attribute and subsumes the required values for that attribute + case (name, requiredValues) => + // The attributes and their values are case sensitive during comparison + // i.e tachyon -> true != Tachyon -> true != tachyon -> True + offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name)) + + } + } + + /** + * Parses the attributes constraints provided to spark and build a matching data struct: + * Map[, Set[values-to-match] + * The constraints are specified as ';' separated key-value pairs where keys and values + * are separated by ':'. The ':' implies equality. For example: + * {{{ + * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") + * // would result in + * + * Map( + * "tachyon" -> Set("true"), + * "zone": -> Set("us-east-1a", "us-east-1b") + * ) + * }}} + * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated + * by ':') + * @return Map of constraints to match resources offers. + */ + private[mesos] def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = { + /* + Based on mesos docs: + attributes : attribute ( ";" attribute )* + attribute : labelString ":" ( labelString | "," )+ + labelString : [a-zA-Z0-9_/.-] + */ + val splitter = Splitter.on(';').trimResults().withKeyValueSeparator(':') + // kv splitter + if (constraintsVal.isEmpty) { + Map() + } else { + Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map { + case (k, v) => + if (v == null) { + (k, Set[String]()) + } else { + (k, v.split(',').toSet) + } + } + } + } + + /** + * For the list of offers received, find the ones that match the offer constraints (if specified) + * @param offers set of all offers received + * @return Offers that match the constraints + */ + private[mesos] def filterOffersByConstraints( + offers: JList[Offer], + offerConstraints: Map[String, Set[String]]): mutable.Buffer[Offer] = offers.filter { o => + matchesAttributeRequirements(offerConstraints, (o.getAttributesList map getAttribute).toMap) + } + } From 72fe88adf8d2d61d317801ca4b2e2b304f0a3849 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Sat, 16 May 2015 22:35:32 -0700 Subject: [PATCH 02/24] Fix up tests + remove redundant method override, combine utility class into new mesos scheduler util trait --- .../mesos/CoarseMesosSchedulerBackend.scala | 13 +----- .../scheduler/cluster/mesos/MemoryUtils.scala | 31 -------------- .../cluster/mesos/MesosSchedulerBackend.scala | 4 +- .../cluster/mesos/MesosSchedulerUtils.scala | 18 +++++--- .../mesos/MesosSchedulerBackendSuite.scala | 6 +-- ...e.scala => MesosSchedulerUtilsSuite.scala} | 41 +++++++++++-------- 6 files changed, 45 insertions(+), 68 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala rename core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/{MemoryUtilsSuite.scala => MesosSchedulerUtilsSuite.scala} (51%) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 4aefbf2e113e8..81530c4fbc937 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -181,7 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt if (totalCoresAcquired < maxCores && - mem >= MemoryUtils.calculateTotalMemory(sc) && + mem >= calculateTotalMemory(sc) && cpus >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && !slaveIdsWithExecutors.contains(slaveId)) { @@ -198,8 +198,7 @@ private[spark] class CoarseMesosSchedulerBackend( .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) .setName("Task " + taskId) .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", - MemoryUtils.calculateTotalMemory(sc))) + .addResources(createResource("mem", calculateTotalMemory(sc))) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil @@ -217,14 +216,6 @@ private[spark] class CoarseMesosSchedulerBackend( } } - /** Build a Mesos resource protobuf object */ - private def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue.toInt diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala deleted file mode 100644 index 8df4f3b554c41..0000000000000 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler.cluster.mesos - -import org.apache.spark.SparkContext - -private[spark] object MemoryUtils { - // These defaults copied from YARN - val OVERHEAD_FRACTION = 0.10 - val OVERHEAD_MINIMUM = 384 - - def calculateTotalMemory(sc: SparkContext): Int = { - sc.conf.getInt("spark.mesos.executor.memoryOverhead", - math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory - } -} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c8ce1476ab7d3..abd0197fabe94 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -124,7 +124,7 @@ private[spark] class MesosSchedulerBackend( .setType(Value.Type.SCALAR) .setScalar( Value.Scalar.newBuilder() - .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) + .setValue(calculateTotalMemory(sc)).build()) .build() val executorInfo = MesosExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) @@ -194,7 +194,7 @@ private[spark] class MesosSchedulerBackend( val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - (mem >= MemoryUtils.calculateTotalMemory(sc) && + (mem >= calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) || (slaveIdsWithExecutors.contains(slaveId) && diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 3f424189844ed..f838d4f17ab9a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -17,18 +17,17 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util.{List => JList} import java.util.concurrent.CountDownLatch - -import com.google.common.base.Splitter +import java.util.{List => JList} import scala.collection.JavaConversions._ import scala.collection.mutable +import com.google.common.base.Splitter import org.apache.mesos.Protos._ -import org.apache.mesos.{Protos, MesosSchedulerDriver, Scheduler} -import org.apache.spark.Logging +import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkContext} /** @@ -187,4 +186,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging { matchesAttributeRequirements(offerConstraints, (o.getAttributesList map getAttribute).toMap) } + // These defaults copied from YARN + val OVERHEAD_FRACTION = 0.10 + val OVERHEAD_MINIMUM = 384 + + private[mesos] def calculateTotalMemory(sc: SparkContext): Int = { + sc.conf.getInt("spark.mesos.executor.memoryOverhead", + math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory + } + } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index 68df46a41ddc8..d01837fe78957 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -149,7 +149,9 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(sc.conf).thenReturn(new SparkConf) when(sc.listenerBus).thenReturn(listenerBus) - val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val minMem = backend.calculateTotalMemory(sc) val minCpu = 4 val mesosOffers = new java.util.ArrayList[Offer] @@ -157,8 +159,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi mesosOffers.add(createOffer(2, minMem - 1, minCpu)) mesosOffers.add(createOffer(3, minMem, minCpu)) - val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") - val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(0).getSlaveId.getValue, diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala similarity index 51% rename from core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala rename to core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index e72285d03d3ee..59652f19d9ba4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,30 +17,39 @@ package org.apache.spark.scheduler.cluster.mesos +import org.apache.spark.{SparkConf, SparkContext} import org.mockito.Mockito._ +import org.scalatest._ import org.scalatest.mock.MockitoSugar -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar { -class MemoryUtilsSuite extends SparkFunSuite with MockitoSugar { - test("MesosMemoryUtils should always override memoryOverhead when it's set") { + def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new { val sparkConf = new SparkConf - val sc = mock[SparkContext] when(sc.conf).thenReturn(sparkConf) + } + + "MesosSchedulerUtils" should "use at-least minimum overhead" in new MesosSchedulerUtils { + val f = fixture + // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 + when(f.sc.executorMemory).thenReturn(512) + calculateTotalMemory(f.sc) shouldBe 896 + } + it should "use overhead if it is greater than minimum value" in new MesosSchedulerUtils { + val f = fixture // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 - when(sc.executorMemory).thenReturn(512) - assert(MemoryUtils.calculateTotalMemory(sc) === 896) - - // 384 < sc.executorMemory * 0.1 => 4096 + (4096 * 0.1) = 4505.6 - when(sc.executorMemory).thenReturn(4096) - assert(MemoryUtils.calculateTotalMemory(sc) === 4505) - - // set memoryOverhead - sparkConf.set("spark.mesos.executor.memoryOverhead", "100") - assert(MemoryUtils.calculateTotalMemory(sc) === 4196) - sparkConf.set("spark.mesos.executor.memoryOverhead", "400") - assert(MemoryUtils.calculateTotalMemory(sc) === 4496) + when(f.sc.executorMemory).thenReturn(4096) + calculateTotalMemory(f.sc) shouldBe 4505 } + + it should "use spark.mesos.executor.memoryOverhead (if set)" in new MesosSchedulerUtils { + val f = fixture + // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 + when(f.sc.executorMemory).thenReturn(1024) + f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") + calculateTotalMemory(f.sc) shouldBe 1536 + } + } From ec9d9a64d8ce9f003544da49f76bb31961efff47 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Sat, 16 May 2015 22:44:30 -0700 Subject: [PATCH 03/24] Add tests for parse constraint string --- .../cluster/mesos/MesosSchedulerUtilsSuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 59652f19d9ba4..90aa9e12b1579 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -52,4 +52,16 @@ class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar calculateTotalMemory(f.sc) shouldBe 1536 } + it should "parse a non-empty constraint string correctly" in new MesosSchedulerUtils { + val expectedMap = Map( + "tachyon" -> Set("true"), + "zone" -> Set("us-east-1a", "us-east-1b") + ) + parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap) + } + + it should "parse an empty constraint string correctly" in new MesosSchedulerUtils { + parseConstraintString("") should be (Map()) + } + } From addedba8d85b69d8629ce53ebab80c674f35891f Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Sun, 17 May 2015 01:09:07 -0700 Subject: [PATCH 04/24] Added test case for malformed constraint string --- .../scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 90aa9e12b1579..3e9763be17db0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -61,7 +61,11 @@ class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar } it should "parse an empty constraint string correctly" in new MesosSchedulerUtils { - parseConstraintString("") should be (Map()) + parseConstraintString("") shouldBe Map() + } + + it should "throw an exception when the input is malformed" in new MesosSchedulerUtils { + an[IllegalArgumentException] should be thrownBy parseConstraintString("tachyon;zone:us-east") } } From 8cc1e8f4c78a1aba85edf27052f896f954582295 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Sun, 17 May 2015 16:00:17 -0700 Subject: [PATCH 05/24] Make exception message more explicit about the source of the error --- .../cluster/mesos/MesosSchedulerUtils.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index f838d4f17ab9a..a5847742f8777 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -164,13 +164,18 @@ private[mesos] trait MesosSchedulerUtils extends Logging { if (constraintsVal.isEmpty) { Map() } else { - Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map { - case (k, v) => - if (v == null) { - (k, Set[String]()) - } else { - (k, v.split(',').toSet) - } + try { + Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map { + case (k, v) => + if (v == null) { + (k, Set[String]()) + } else { + (k, v.split(',').toSet) + } + } + } catch { + case e: Throwable => + throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e) } } } From 0c64df674780d61ab16e6590627bfe5374d9d483 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 18 May 2015 11:12:23 -0700 Subject: [PATCH 06/24] Rename overhead fractions to memory_*, fix spacing --- .../cluster/mesos/MesosSchedulerUtils.scala | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index a5847742f8777..641034eaa8b84 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -119,8 +119,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * else if attribute name and value is specified, subset match is performed on slave attributes */ private[mesos] def matchesAttributeRequirements( - slaveOfferConstraints: Map[String, Set[String]], - offerAttributes: Map[String, Set[String]]): Boolean = + slaveOfferConstraints: Map[String, Set[String]], + offerAttributes: Map[String, Set[String]]): Boolean = if (slaveOfferConstraints.isEmpty) { true } else { @@ -186,18 +186,19 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * @return Offers that match the constraints */ private[mesos] def filterOffersByConstraints( - offers: JList[Offer], - offerConstraints: Map[String, Set[String]]): mutable.Buffer[Offer] = offers.filter { o => + offers: JList[Offer], + offerConstraints: Map[String, Set[String]]): mutable.Buffer[Offer] = offers.filter { o => matchesAttributeRequirements(offerConstraints, (o.getAttributesList map getAttribute).toMap) } // These defaults copied from YARN - val OVERHEAD_FRACTION = 0.10 - val OVERHEAD_MINIMUM = 384 + private val MEMORY_OVERHEAD_FRACTION = 0.10 + private val MEMORY_OVERHEAD_MINIMUM = 384 private[mesos] def calculateTotalMemory(sc: SparkContext): Int = { sc.conf.getInt("spark.mesos.executor.memoryOverhead", - math.max(OVERHEAD_FRACTION * sc.executorMemory, OVERHEAD_MINIMUM).toInt) + sc.executorMemory + math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + + sc.executorMemory } } From c09ed8481eb2c119a4fdb02d0fca310e32798931 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 18 May 2015 11:43:33 -0700 Subject: [PATCH 07/24] Fixed the access modifier on offerConstraints val to private[mesos] --- .../spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index abd0197fabe94..fd6427be80cb3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -59,7 +59,8 @@ private[spark] class MesosSchedulerBackend( private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) // Offer constraints - val offerConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + private[mesos] val constraintsStr = sc.conf.get("spark.mesos.constraints", "") + private[mesos] val offerConstraints = parseConstraintString(constraintsStr) @volatile var appId: String = _ From 02031e4d01556833350b48592b2a3145c6a5d857 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Sat, 6 Jun 2015 21:52:03 -0700 Subject: [PATCH 08/24] Fix scalastyle warnings in tests --- .../scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 3e9763be17db0..c0f92b327b25c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -24,11 +24,14 @@ import org.scalatest.mock.MockitoSugar class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar { + // scalastyle:off structural.type + // this is the documented way of generating fixtures in scalatest def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new { val sparkConf = new SparkConf val sc = mock[SparkContext] when(sc.conf).thenReturn(sparkConf) } + // scalastyle:on structural.type "MesosSchedulerUtils" should "use at-least minimum overhead" in new MesosSchedulerUtils { val f = fixture From 63f53f4945a44c574523d9e493cfb24b6fdbf3b7 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Sat, 6 Jun 2015 23:25:21 -0700 Subject: [PATCH 09/24] Update codestyle - uniform style for config values --- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index fd6427be80cb3..88ff9b21b5167 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -59,8 +59,7 @@ private[spark] class MesosSchedulerBackend( private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) // Offer constraints - private[mesos] val constraintsStr = sc.conf.get("spark.mesos.constraints", "") - private[mesos] val offerConstraints = parseConstraintString(constraintsStr) + val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) @volatile var appId: String = _ @@ -189,7 +188,7 @@ private[spark] class MesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { - val qualifyingOffers = filterOffersByConstraints(offers, offerConstraints) + val qualifyingOffers = filterOffersByConstraints(offers, slaveOfferConstraints) // Fail-fast on offers we know will be rejected val (usableOffers, unUsableOffers) = qualifyingOffers.partition { o => val mem = getResource(o.getResourcesList, "mem") From 67b58a0ad02b13fcdbee391624fa2058d6ac21a5 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Tue, 9 Jun 2015 12:11:19 -0700 Subject: [PATCH 10/24] Add documentation for spark.mesos.constraints --- docs/running-on-mesos.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 5f1d6daeb27f0..d155556fef18e 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -184,6 +184,14 @@ acquire. By default, it will acquire *all* cores in the cluster (that get offere only makes sense if you run just one application at a time. You can cap the maximum number of cores using `conf.set("spark.cores.max", "10")` (for example). +You may also make use of `spark.mesos.constraints` to set attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. + +{% highlight scala %} +conf.set("spark.mesos.constraints", "tachyon=true;us-east-1=false") +{% endhighlight %} + +For example, Let's say `spark.mesos.constraints` is set to `tachyon=true;us-east-1=false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors. + # Mesos Docker Support Spark can make use of a Mesos Docker containerizer by setting the property `spark.mesos.executor.docker.image` @@ -298,6 +306,13 @@ See the [configuration page](configuration.html) for information on Spark config the final overhead will be this value. + + spark.mesos.constraints + Attribute based constraints to be matched against when accepting resource offers. + + Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to Mesos Attributes & Resources for more information on attributes. + + # Troubleshooting and Debugging From fdc09372be9950fa3c0396e0b2710b8d78c68afb Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Wed, 24 Jun 2015 14:59:15 -0700 Subject: [PATCH 11/24] Decline offers that did not meet criteria --- .../mesos/CoarseMesosSchedulerBackend.scala | 25 +++++++++-------- .../cluster/mesos/MesosSchedulerBackend.scala | 27 ++++++++++++------- .../cluster/mesos/MesosSchedulerUtils.scala | 10 ------- 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 81530c4fbc937..dd9f0758e9d00 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,8 +18,9 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{Collections, List => JList} +import java.util.{List => JList} +import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -172,15 +173,14 @@ private[spark] class CoarseMesosSchedulerBackend( override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { synchronized { val filters = Filters.newBuilder().setRefuseSeconds(5).build() - - // filter out all the offers that do not meet constraints (if specified) - val qualifyingOffers = filterOffersByConstraints(offers, slaveOfferConstraints) - - for (offer <- qualifyingOffers) { + for (offer <- offers) { + val offerAttributes = (offer.getAttributesList map getAttribute).toMap + val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt - if (totalCoresAcquired < maxCores && + if (meetsConstraints && + totalCoresAcquired < maxCores && mem >= calculateTotalMemory(sc) && cpus >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && @@ -202,15 +202,14 @@ private[spark] class CoarseMesosSchedulerBackend( sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder()) + .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder) } - d.launchTasks( - Collections.singleton(offer.getId), Collections.singletonList(task.build()), filters) + // accept the offer and launch the task + d.launchTasks(List(offer.getId), List(task.build()), filters) } else { - // Filter it out - d.launchTasks( - Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters) + // Decline the offer + d.declineOffer(offer.getId) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 88ff9b21b5167..09414e7a67b62 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -20,6 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{ArrayList => JArrayList, Collections, List => JList} +import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} @@ -188,19 +189,29 @@ private[spark] class MesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { - val qualifyingOffers = filterOffersByConstraints(offers, slaveOfferConstraints) // Fail-fast on offers we know will be rejected - val (usableOffers, unUsableOffers) = qualifyingOffers.partition { o => + val (usableOffers, unUsableOffers) = offers.partition { o => val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - (mem >= calculateTotalMemory(sc) && - // need at least 1 for executor, 1 for task - cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) || - (slaveIdsWithExecutors.contains(slaveId) && - cpus >= scheduler.CPUS_PER_TASK) + val offerAttributes = (o.getAttributesList map getAttribute).toMap + + // check if all constraints are satisfield + // 1. Attribute constraints + // 2. Memory requirements + // 3. CPU requirements + val meetsConstrains = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) + val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) + + // need at least 1 for executor, 1 for task + (meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) || + (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } + // Decline offers we ruled out immediately + unUsableOffers.foreach(o => d.declineOffer(o.getId)) + val workerOffers = usableOffers.map { o => val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { getResource(o.getResourcesList, "cpus").toInt @@ -254,8 +265,6 @@ private[spark] class MesosSchedulerBackend( d.declineOffer(o.getId) } - // Decline offers we ruled out immediately - unUsableOffers.foreach(o => d.declineOffer(o.getId)) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 641034eaa8b84..5aa57b5a1a41c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -180,16 +180,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } } - /** - * For the list of offers received, find the ones that match the offer constraints (if specified) - * @param offers set of all offers received - * @return Offers that match the constraints - */ - private[mesos] def filterOffersByConstraints( - offers: JList[Offer], - offerConstraints: Map[String, Set[String]]): mutable.Buffer[Offer] = offers.filter { o => - matchesAttributeRequirements(offerConstraints, (o.getAttributesList map getAttribute).toMap) - } // These defaults copied from YARN private val MEMORY_OVERHEAD_FRACTION = 0.10 From 662535f6b95e063099fcd48db80a22688bdd6ea4 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Thu, 25 Jun 2015 12:49:14 -0700 Subject: [PATCH 12/24] Incorporate code review comments + use SparkFunSuite --- .../cluster/mesos/MesosSchedulerBackend.scala | 3 +- .../cluster/mesos/MesosSchedulerUtils.scala | 34 +++++------ .../mesos/MesosSchedulerUtilsSuite.scala | 58 ++++++++++++++----- 3 files changed, 60 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 09414e7a67b62..ca52473c80ada 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -199,12 +199,11 @@ private[spark] class MesosSchedulerBackend( // check if all constraints are satisfield // 1. Attribute constraints // 2. Memory requirements - // 3. CPU requirements + // 3. CPU requirements - need at least 1 for executor, 1 for task val meetsConstrains = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - // need at least 1 for executor, 1 for task (meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 5aa57b5a1a41c..439b2d7ecc48d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -21,7 +21,6 @@ import java.util.concurrent.CountDownLatch import java.util.{List => JList} import scala.collection.JavaConversions._ -import scala.collection.mutable import com.google.common.base.Splitter import org.apache.mesos.Protos._ @@ -98,12 +97,12 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ - private[mesos] def getAttribute(attr: Attribute): (String, Set[String]) = + def getAttribute(attr: Attribute): (String, Set[String]) = (attr.getName, attr.getText.getValue.split(',').toSet) /** Build a Mesos resource protobuf object */ - private[mesos] def createResource(resourceName: String, quantity: Double): Protos.Resource = { + def createResource(resourceName: String, quantity: Double): Protos.Resource = { Resource.newBuilder() .setName(resourceName) .setType(Value.Type.SCALAR) @@ -118,27 +117,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * else if attribute is defined and no values are given, simple attribute presence is preformed * else if attribute name and value is specified, subset match is performed on slave attributes */ - private[mesos] def matchesAttributeRequirements( + def matchesAttributeRequirements( slaveOfferConstraints: Map[String, Set[String]], offerAttributes: Map[String, Set[String]]): Boolean = - if (slaveOfferConstraints.isEmpty) { - true - } else { - slaveOfferConstraints.forall { - // offer has the required attribute and subsumes the required values for that attribute - case (name, requiredValues) => - // The attributes and their values are case sensitive during comparison - // i.e tachyon -> true != Tachyon -> true != tachyon -> True - offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name)) + slaveOfferConstraints.forall { + // offer has the required attribute and subsumes the required values for that attribute + case (name, requiredValues) => + // The attributes and their values are case sensitive during comparison + // i.e tachyon -> true != Tachyon -> true != tachyon -> True + offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name)) - } } /** * Parses the attributes constraints provided to spark and build a matching data struct: - * Map[, Set[values-to-match] + * Map[, Set[values-to-match]] * The constraints are specified as ';' separated key-value pairs where keys and values - * are separated by ':'. The ':' implies equality. For example: + * are separated by ':'. The ':' implies equality (for singular values) and "is one of" for + * multiple values (comma separated). For example: * {{{ * parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") * // would result in @@ -152,7 +148,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * by ':') * @return Map of constraints to match resources offers. */ - private[mesos] def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = { + def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = { /* Based on mesos docs: attributes : attribute ( ";" attribute )* @@ -167,7 +163,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { try { Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map { case (k, v) => - if (v == null) { + if (v == null || v.isEmpty) { (k, Set[String]()) } else { (k, v.split(',').toSet) @@ -185,7 +181,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { private val MEMORY_OVERHEAD_FRACTION = 0.10 private val MEMORY_OVERHEAD_MINIMUM = 384 - private[mesos] def calculateTotalMemory(sc: SparkContext): Int = { + def calculateTotalMemory(sc: SparkContext): Int = { sc.conf.getInt("spark.mesos.executor.memoryOverhead", math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + sc.executorMemory diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index c0f92b327b25c..8228f9885cbfa 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,12 +17,12 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.{SparkFunSuite, SparkConf, SparkContext} import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar -class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar { +class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { // scalastyle:off structural.type // this is the documented way of generating fixtures in scalatest @@ -31,44 +31,74 @@ class MesosSchedulerUtilsSuite extends FlatSpec with Matchers with MockitoSugar val sc = mock[SparkContext] when(sc.conf).thenReturn(sparkConf) } + val utils = new MesosSchedulerUtils { } // scalastyle:on structural.type - "MesosSchedulerUtils" should "use at-least minimum overhead" in new MesosSchedulerUtils { + test("use at-least minimum overhead") { val f = fixture // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(512) - calculateTotalMemory(f.sc) shouldBe 896 + utils.calculateTotalMemory(f.sc) shouldBe 896 } - it should "use overhead if it is greater than minimum value" in new MesosSchedulerUtils { + test("use overhead if it is greater than minimum value") { val f = fixture // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(4096) - calculateTotalMemory(f.sc) shouldBe 4505 + utils.calculateTotalMemory(f.sc) shouldBe 4505 } - it should "use spark.mesos.executor.memoryOverhead (if set)" in new MesosSchedulerUtils { + test("use spark.mesos.executor.memoryOverhead (if set)") { val f = fixture + val utils = new MesosSchedulerUtils { } // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(1024) f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") - calculateTotalMemory(f.sc) shouldBe 1536 + utils.calculateTotalMemory(f.sc) shouldBe 1536 } - it should "parse a non-empty constraint string correctly" in new MesosSchedulerUtils { + test("parse a non-empty constraint string correctly") { val expectedMap = Map( "tachyon" -> Set("true"), "zone" -> Set("us-east-1a", "us-east-1b") ) - parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap) + utils.parseConstraintString("tachyon:true;zone:us-east-1a,us-east-1b") should be (expectedMap) } - it should "parse an empty constraint string correctly" in new MesosSchedulerUtils { - parseConstraintString("") shouldBe Map() + test("parse an empty constraint string correctly") { + val utils = new MesosSchedulerUtils { } + utils.parseConstraintString("") shouldBe Map() } - it should "throw an exception when the input is malformed" in new MesosSchedulerUtils { - an[IllegalArgumentException] should be thrownBy parseConstraintString("tachyon;zone:us-east") + test("throw an exception when the input is malformed") { + an[IllegalArgumentException] should be thrownBy + utils.parseConstraintString("tachyon;zone:us-east") + } + + test("empty values for attributes' constraints matches all values") { + val constraintsStr = "tachyon:" + val parsedConstraints = utils.parseConstraintString(constraintsStr) + + parsedConstraints shouldBe Map("tachyon" -> Set()) + + val `offer with no tachyon` = Map("zone" -> Set("us-east-1a", "us-east-1b")) + val `offer with tachyon:true` = Map("tachyon" -> Set("true")) + val `offer with tachyon:false` = Map("tachyon" -> Set("false")) + + utils.matchesAttributeRequirements(parsedConstraints, `offer with no tachyon`) shouldBe false + utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:true`) shouldBe true + utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:false`) shouldBe true + } + + test("subset match is performed constraint attributes") { + val `constraint with superset` = Map( + "tachyon" -> Set("true"), + "zone" -> Set("us-east-1a", "us-east-1b", "us-east-1c")) + + val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c" + val parsedConstraints = utils.parseConstraintString(zoneConstraintStr) + + utils.matchesAttributeRequirements(parsedConstraints, `constraint with superset`) shouldBe true } } From 00be2523c2cb86f3ef47d1d1876c57fdb8d1f80d Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Fri, 26 Jun 2015 09:29:15 -0700 Subject: [PATCH 13/24] Style changes as per code review comments --- .../mesos/CoarseMesosSchedulerBackend.scala | 3 ++- .../cluster/mesos/MesosSchedulerUtils.scala | 4 ++++ .../cluster/mesos/MesosSchedulerUtilsSuite.scala | 16 ++++++++-------- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index dd9f0758e9d00..0210659df6f1e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -67,7 +67,8 @@ private[spark] class CoarseMesosSchedulerBackend( val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) // Offer constraints - val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + private val slaveOfferConstraints = + parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) var nextMesosTaskId = 0 diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 439b2d7ecc48d..cecfd0908a599 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -23,6 +23,7 @@ import java.util.{List => JList} import scala.collection.JavaConversions._ import com.google.common.base.Splitter + import org.apache.mesos.Protos._ import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} import org.apache.spark.util.Utils @@ -144,6 +145,9 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * "zone": -> Set("us-east-1a", "us-east-1b") * ) * }}} + * + * Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/ + * * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated * by ':') * @return Map of constraints to match resources offers. diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 8228f9885cbfa..720922abe2de1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -81,24 +81,24 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS parsedConstraints shouldBe Map("tachyon" -> Set()) - val `offer with no tachyon` = Map("zone" -> Set("us-east-1a", "us-east-1b")) - val `offer with tachyon:true` = Map("tachyon" -> Set("true")) - val `offer with tachyon:false` = Map("tachyon" -> Set("false")) + val noTachyonOffer = Map("zone" -> Set("us-east-1a", "us-east-1b")) + val tachyonTrueOffer = Map("tachyon" -> Set("true")) + val tachyonFalseOffer = Map("tachyon" -> Set("false")) - utils.matchesAttributeRequirements(parsedConstraints, `offer with no tachyon`) shouldBe false - utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:true`) shouldBe true - utils.matchesAttributeRequirements(parsedConstraints, `offer with tachyon:false`) shouldBe true + utils.matchesAttributeRequirements(parsedConstraints, noTachyonOffer) shouldBe false + utils.matchesAttributeRequirements(parsedConstraints, tachyonTrueOffer) shouldBe true + utils.matchesAttributeRequirements(parsedConstraints, tachyonFalseOffer) shouldBe true } test("subset match is performed constraint attributes") { - val `constraint with superset` = Map( + val supersetConstraint = Map( "tachyon" -> Set("true"), "zone" -> Set("us-east-1a", "us-east-1b", "us-east-1c")) val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c" val parsedConstraints = utils.parseConstraintString(zoneConstraintStr) - utils.matchesAttributeRequirements(parsedConstraints, `constraint with superset`) shouldBe true + utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true } } From fc7eb5b57d4138628a8f122d672f7ded9bf705ed Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Fri, 26 Jun 2015 10:44:00 -0700 Subject: [PATCH 14/24] Fix import codestyle --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 1 + .../scheduler/cluster/mesos/MesosClusterScheduler.scala | 1 + .../scheduler/cluster/mesos/MesosSchedulerUtils.scala | 8 ++++---- .../cluster/mesos/MesosSchedulerUtilsSuite.scala | 3 ++- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 0210659df6f1e..6361193340cc4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.{Scheduler => MScheduler, _} + import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 1067a7f1caf4c..d3a20f822176e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.Environment.Variable import org.apache.mesos.Protos.TaskStatus.Reason import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.{Scheduler, SchedulerDriver} + import org.apache.spark.deploy.mesos.MesosDriverDescription import org.apache.spark.deploy.rest.{CreateSubmissionResponse, KillSubmissionResponse, SubmissionStatusResponse} import org.apache.spark.metrics.MetricsSystem diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index cecfd0908a599..0aaedd00a7b87 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -17,17 +17,17 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util.concurrent.CountDownLatch import java.util.{List => JList} +import java.util.concurrent.CountDownLatch import scala.collection.JavaConversions._ import com.google.common.base.Splitter - -import org.apache.mesos.Protos._ import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} -import org.apache.spark.util.Utils +import org.apache.mesos.Protos._ + import org.apache.spark.{Logging, SparkContext} +import org.apache.spark.util.Utils /** diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 720922abe2de1..d73f790757b7a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,11 +17,12 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.spark.{SparkFunSuite, SparkConf, SparkContext} import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar +import org.apache.spark.{SparkFunSuite, SparkConf, SparkContext} + class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { // scalastyle:off structural.type From 7fee0eab861503dd5188942649d5621777db40f7 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 10:31:09 -0700 Subject: [PATCH 15/24] Add debug statements --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 2 ++ .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 8 +++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 6361193340cc4..7df791fd3b250 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -208,9 +208,11 @@ private[spark] class CoarseMesosSchedulerBackend( } // accept the offer and launch the task + logDebug(s"Accepting offer: ${offer.getId} with attributes: $offerAttributes mem: $mem cpu: $cpus") d.launchTasks(List(offer.getId), List(task.build()), filters) } else { // Decline the offer + logDebug(s"Declining offer: ${offer.getId} with attributes: $offerAttributes mem: $mem cpu: $cpus") d.declineOffer(offer.getId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index ca52473c80ada..0eeacaa002199 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -204,8 +204,14 @@ private[spark] class MesosSchedulerBackend( val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - (meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) || + val meetsRequirements = (meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) + + // add some debug messaging + val debugStatement = if(meetsRequirements) "Accepting" else "Declining" + logDebug(s"$debugStatement offer: ${o.getId} with attributes: $offerAttributes mem: $mem cpu: $cpus") + + meetsRequirements } // Decline offers we ruled out immediately From c0cbc75ea84c9f768e36440ba4d2f32e36574a08 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 10:35:56 -0700 Subject: [PATCH 16/24] Use offer id value for debug message --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 5 +++-- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 3 ++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 7df791fd3b250..a46e771881305 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -181,6 +181,7 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt + val id = offer.getId.getValue if (meetsConstraints && totalCoresAcquired < maxCores && mem >= calculateTotalMemory(sc) && @@ -208,11 +209,11 @@ private[spark] class CoarseMesosSchedulerBackend( } // accept the offer and launch the task - logDebug(s"Accepting offer: ${offer.getId} with attributes: $offerAttributes mem: $mem cpu: $cpus") + logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") d.launchTasks(List(offer.getId), List(task.build()), filters) } else { // Decline the offer - logDebug(s"Declining offer: ${offer.getId} with attributes: $offerAttributes mem: $mem cpu: $cpus") + logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") d.declineOffer(offer.getId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 0eeacaa002199..f0b9204f642d3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -209,7 +209,8 @@ private[spark] class MesosSchedulerBackend( // add some debug messaging val debugStatement = if(meetsRequirements) "Accepting" else "Declining" - logDebug(s"$debugStatement offer: ${o.getId} with attributes: $offerAttributes mem: $mem cpu: $cpus") + val id = o.getId.getValue + logDebug(s"$debugStatement offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") meetsRequirements } From 1bce7822bc42b642d9938b968db2d447d3cce917 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 11:13:19 -0700 Subject: [PATCH 17/24] Fix nit pick whitespace --- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 5 +++-- .../spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala | 4 ---- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index f0b9204f642d3..0742acd2fbc80 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -60,7 +60,8 @@ private[spark] class MesosSchedulerBackend( private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) // Offer constraints - val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) + private[mesos] val slaveOfferConstraints = + parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) @volatile var appId: String = _ @@ -205,7 +206,7 @@ private[spark] class MesosSchedulerBackend( val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) val meetsRequirements = (meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) || - (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) + (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) // add some debug messaging val debugStatement = if(meetsRequirements) "Accepting" else "Declining" diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 0aaedd00a7b87..1f87629fb1041 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -29,7 +29,6 @@ import org.apache.mesos.Protos._ import org.apache.spark.{Logging, SparkContext} import org.apache.spark.util.Utils - /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper * methods and Mesos scheduler will use. @@ -96,7 +95,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { 0.0 } - /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ def getAttribute(attr: Attribute): (String, Set[String]) = (attr.getName, attr.getText.getValue.split(',').toSet) @@ -111,7 +109,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { .build() } - /** * Match the requirements (if any) to the offer attributes. * if attribute requirements are not specified - return true @@ -180,7 +177,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } } - // These defaults copied from YARN private val MEMORY_OVERHEAD_FRACTION = 0.10 private val MEMORY_OVERHEAD_MINIMUM = 384 From 5ccc32d7ed0d90e144ff7607c737e40773587f8f Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 11:15:06 -0700 Subject: [PATCH 18/24] Fix nit pick whitespace --- .../spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 1f87629fb1041..33a2b72038600 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -99,7 +99,6 @@ private[mesos] trait MesosSchedulerUtils extends Logging { def getAttribute(attr: Attribute): (String, Set[String]) = (attr.getName, attr.getText.getValue.split(',').toSet) - /** Build a Mesos resource protobuf object */ def createResource(resourceName: String, quantity: Double): Protos.Resource = { Resource.newBuilder() From 482fd716f6b329494ba34f51c81bd609bead6d97 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 11:59:27 -0700 Subject: [PATCH 19/24] Update access modifier to private[this] for offer constraints --- .../spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 0742acd2fbc80..0d6c1f7205b3a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -60,7 +60,7 @@ private[spark] class MesosSchedulerBackend( private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) // Offer constraints - private[mesos] val slaveOfferConstraints = + private[this] val slaveOfferConstraints = parseConstraintString(sc.conf.get("spark.mesos.constraints", "")) @volatile var appId: String = _ From 1a24d0bbffd6d08838a10861f86bac142d3ae87b Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 16:51:55 -0700 Subject: [PATCH 20/24] Expand scope of attributes matching to include all data types --- .../mesos/CoarseMesosSchedulerBackend.scala | 9 ++- .../cluster/mesos/MesosSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosSchedulerUtils.scala | 60 +++++++++++++++---- .../mesos/MesosSchedulerUtilsSuite.scala | 56 ++++++++++++++--- 4 files changed, 101 insertions(+), 26 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index a46e771881305..0c020fc8cc84c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -20,18 +20,17 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{List => JList} -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, HashSet} - import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.{Scheduler => MScheduler, _} - import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, HashSet} + /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever @@ -176,7 +175,7 @@ private[spark] class CoarseMesosSchedulerBackend( synchronized { val filters = Filters.newBuilder().setRefuseSeconds(5).build() for (offer <- offers) { - val offerAttributes = (offer.getAttributesList map getAttribute).toMap + val offerAttributes = toAttributeMap(offer.getAttributesList) val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 0d6c1f7205b3a..9dc6d9381cdd3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -195,7 +195,7 @@ private[spark] class MesosSchedulerBackend( val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - val offerAttributes = (o.getAttributesList map getAttribute).toMap + val offerAttributes = toAttributeMap(o.getAttributesList) // check if all constraints are satisfield // 1. Attribute constraints diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 33a2b72038600..f31a4c7e72557 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -17,17 +17,17 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util.{List => JList} import java.util.concurrent.CountDownLatch - -import scala.collection.JavaConversions._ +import java.util.{List => JList} import com.google.common.base.Splitter -import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} import org.apache.mesos.Protos._ - -import org.apache.spark.{Logging, SparkContext} +import org.apache.mesos.protobuf.GeneratedMessage +import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkContext} + +import scala.collection.JavaConversions._ /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper @@ -108,6 +108,23 @@ private[mesos] trait MesosSchedulerUtils extends Logging { .build() } + /** + * Converts the attributes from the resource offer into a Map of name -> Attribute Value + * The attribute values are the mesos attribute types and they are + * @param offerAttributes + * @return + */ + def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = + offerAttributes.map(attr => { + val attrValue = attr.getType match { + case Value.Type.SCALAR => attr.getScalar + case Value.Type.RANGES => attr.getRanges + case Value.Type.SET => attr.getSet + case Value.Type.TEXT => attr.getText + } + (attr.getName, attrValue) + }).toMap + /** * Match the requirements (if any) to the offer attributes. * if attribute requirements are not specified - return true @@ -116,15 +133,32 @@ private[mesos] trait MesosSchedulerUtils extends Logging { */ def matchesAttributeRequirements( slaveOfferConstraints: Map[String, Set[String]], - offerAttributes: Map[String, Set[String]]): Boolean = + offerAttributes: Map[String, GeneratedMessage]): Boolean = slaveOfferConstraints.forall { // offer has the required attribute and subsumes the required values for that attribute case (name, requiredValues) => - // The attributes and their values are case sensitive during comparison - // i.e tachyon -> true != Tachyon -> true != tachyon -> True - offerAttributes.contains(name) && requiredValues.subsetOf(offerAttributes(name)) - - } + offerAttributes.get(name) match { + case None => false + case Some(_) if requiredValues.isEmpty => true // empty value matches presence + case Some(scalarValue: Value.Scalar) => + // check if provided values is less than equal to the offered values + requiredValues.map(_.toDouble).exists(_ <= scalarValue.getValue) + case Some(rangeValue: Value.Range) => + val offerRange = rangeValue.getBegin to rangeValue.getEnd + // Check if there is some required value that is between the ranges specified + // Note: We only support the ability to specify discrete values, in the future + // we may expand it to subsume ranges specified with a XX..YY value or something + // similar to that. + requiredValues.map(_.toLong).exists(offerRange.contains(_)) + case Some(offeredValue: Value.Set) => + // check if the specified required values is a subset of offered set + requiredValues.subsetOf(offeredValue.getItemList.toSet) + case Some(textValue: Value.Text) => + // check if the specified value is equal, if multiple values are specified + // we succeed if any of them match. + requiredValues.contains(textValue.getValue) + } + } /** * Parses the attributes constraints provided to spark and build a matching data struct: @@ -143,6 +177,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * }}} * * Mesos documentation: http://mesos.apache.org/documentation/attributes-resources/ + * https://github.com/apache/mesos/blob/master/src/common/values.cpp + * https://github.com/apache/mesos/blob/master/src/common/attributes.cpp * * @param constraintsVal constaints string consisting of ';' separated key-value pairs (separated * by ':') diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index d73f790757b7a..56a130d29e747 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,11 +17,11 @@ package org.apache.spark.scheduler.cluster.mesos +import org.apache.mesos.Protos.Value import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar - -import org.apache.spark.{SparkFunSuite, SparkConf, SparkContext} +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { @@ -82,19 +82,24 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS parsedConstraints shouldBe Map("tachyon" -> Set()) - val noTachyonOffer = Map("zone" -> Set("us-east-1a", "us-east-1b")) - val tachyonTrueOffer = Map("tachyon" -> Set("true")) - val tachyonFalseOffer = Map("tachyon" -> Set("false")) + val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build() + val noTachyonOffer = Map("zone" -> zoneSet) + val tachyonTrueOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build()) + val tachyonFalseOffer = Map("tachyon" -> Value.Text.newBuilder().setValue("false").build()) utils.matchesAttributeRequirements(parsedConstraints, noTachyonOffer) shouldBe false utils.matchesAttributeRequirements(parsedConstraints, tachyonTrueOffer) shouldBe true utils.matchesAttributeRequirements(parsedConstraints, tachyonFalseOffer) shouldBe true } - test("subset match is performed constraint attributes") { + test("subset match is performed for set attributes") { val supersetConstraint = Map( - "tachyon" -> Set("true"), - "zone" -> Set("us-east-1a", "us-east-1b", "us-east-1c")) + "tachyon" -> Value.Text.newBuilder().setValue("true").build(), + "zone" -> Value.Set.newBuilder() + .addItem("us-east-1a") + .addItem("us-east-1b") + .addItem("us-east-1c") + .build()) val zoneConstraintStr = "tachyon:;zone:us-east-1a,us-east-1c" val parsedConstraints = utils.parseConstraintString(zoneConstraintStr) @@ -102,4 +107,39 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true } + test("less than equal match is performed on scalar attributes") { + val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build()) + + val ltConstraint = utils.parseConstraintString("gpus:2") + val eqConstraint = utils.parseConstraintString("gpus:3") + val gtConstraint = utils.parseConstraintString("gpus:4") + + utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false + } + + test("contains match is performed for range attributes") { + val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build()) + val ltConstraint = utils.parseConstraintString("ports:6000") + val eqConstraint = utils.parseConstraintString("ports:7500") + val gtConstraint = utils.parseConstraintString("ports:8002") + val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300") + + utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false + utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false + utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true + } + + test("equality match is performed for text attributes") { + val offerAttribs = Map("tachyon" -> Value.Text.newBuilder().setValue("true").build()) + + val trueConstraint = utils.parseConstraintString("tachyon:true") + val falseConstraint = utils.parseConstraintString("tachyon:false") + + utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false + } + } From c3523e77a43bae517686383a50128f0de0946cab Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 16:59:07 -0700 Subject: [PATCH 21/24] Added docs --- docs/running-on-mesos.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index d155556fef18e..1f915d8ea1d73 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -311,6 +311,13 @@ See the [configuration page](configuration.html) for information on Spark config Attribute based constraints to be matched against when accepting resource offers. Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to Mesos Attributes & Resources for more information on attributes. +
    +
  • Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.
  • +
  • Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.
  • +
  • Set constraints are matched with "subset of" semantics i.e. value in the constraint must be a subset of the resource offer's value.
  • +
  • Text constraints are metched with "equality" semantics i.e. value in the constraint must be exactly equal to the resource offer's value.
  • +
  • In case there is no value present as a part of the constraint any offer with the corresponding attribute will be accepted (without value check).
  • +
From 8b73f2dba33d191ef6138b39ca9638f9dfd5c327 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Mon, 29 Jun 2015 21:17:55 -0700 Subject: [PATCH 22/24] Fix imports --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 10 +++++----- .../cluster/mesos/MesosSchedulerBackend.scala | 4 ++-- .../scheduler/cluster/mesos/MesosSchedulerUtils.scala | 10 +++++----- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 0c020fc8cc84c..b68f8c7685eba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -20,16 +20,16 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{List => JList} -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import scala.collection.JavaConversions._ +import scala.collection.mutable.{HashMap, HashSet} + import org.apache.mesos.{Scheduler => MScheduler, _} +import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} +import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils -import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, HashSet} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 9dc6d9381cdd3..c28e5577a059b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -23,14 +23,14 @@ import java.util.{ArrayList => JArrayList, Collections, List => JList} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} +import org.apache.mesos.{Scheduler => MScheduler, _} import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} import org.apache.mesos.protobuf.ByteString -import org.apache.mesos.{Scheduler => MScheduler, _} +import org.apache.spark.{SparkContext, SparkException, TaskState} import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.Utils -import org.apache.spark.{SparkContext, SparkException, TaskState} /** * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index f31a4c7e72557..d70bd636356cc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -17,17 +17,17 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util.concurrent.CountDownLatch import java.util.{List => JList} +import java.util.concurrent.CountDownLatch + +import scala.collection.JavaConversions._ import com.google.common.base.Splitter +import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} import org.apache.mesos.Protos._ import org.apache.mesos.protobuf.GeneratedMessage -import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} -import org.apache.spark.util.Utils import org.apache.spark.{Logging, SparkContext} - -import scala.collection.JavaConversions._ +import org.apache.spark.util.Utils /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper From d83801ce81fd5d0b475df52f9c82516c7cd95f5b Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Wed, 1 Jul 2015 18:11:24 -0700 Subject: [PATCH 23/24] Update code as per code review comments --- .../cluster/mesos/MesosSchedulerBackend.scala | 28 +++++++++---------- .../cluster/mesos/MesosSchedulerUtils.scala | 23 +++++++++++---- .../mesos/MesosSchedulerUtilsSuite.scala | 5 ---- 3 files changed, 31 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c28e5577a059b..c7bcbeeac0c89 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -75,8 +75,8 @@ private[spark] class MesosSchedulerBackend( val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } + throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + } val environment = Environment.newBuilder() sc.conf.getOption("spark.executor.extraClassPath").foreach { cp => environment.addVariables( @@ -119,7 +119,7 @@ private[spark] class MesosSchedulerBackend( .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder() - .setValue(mesosExecutorCores).build()) + .setValue(mesosExecutorCores).build()) .build() val memory = Resource.newBuilder() .setName("mem") @@ -201,15 +201,15 @@ private[spark] class MesosSchedulerBackend( // 1. Attribute constraints // 2. Memory requirements // 3. CPU requirements - need at least 1 for executor, 1 for task - val meetsConstrains = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) + val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - val meetsRequirements = (meetsConstrains && meetsMemoryRequirements && meetsCPURequirements) || + val meetsRequirements = (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) // add some debug messaging - val debugStatement = if(meetsRequirements) "Accepting" else "Declining" + val debugStatement = if (meetsRequirements) "Accepting" else "Declining" val id = o.getId.getValue logDebug(s"$debugStatement offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") @@ -244,15 +244,15 @@ private[spark] class MesosSchedulerBackend( val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) acceptedOffers .foreach { offer => - offer.foreach { taskDesc => - val slaveId = taskDesc.executorId - slaveIdsWithExecutors += slaveId - slavesIdsOfAcceptedOffers += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(createMesosTask(taskDesc, slaveId)) - } + offer.foreach { taskDesc => + val slaveId = taskDesc.executorId + slaveIdsWithExecutors += slaveId + slavesIdsOfAcceptedOffers += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(createMesosTask(taskDesc, slaveId)) } + } // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index d70bd636356cc..a06b69e22f5da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -21,6 +21,7 @@ import java.util.{List => JList} import java.util.concurrent.CountDownLatch import scala.collection.JavaConversions._ +import scala.util.control.NonFatal import com.google.common.base.Splitter import org.apache.mesos.{MesosSchedulerDriver, Protos, Scheduler} @@ -96,11 +97,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ - def getAttribute(attr: Attribute): (String, Set[String]) = + protected def getAttribute(attr: Attribute): (String, Set[String]) = { (attr.getName, attr.getText.getValue.split(',').toSet) + } + /** Build a Mesos resource protobuf object */ - def createResource(resourceName: String, quantity: Double): Protos.Resource = { + protected def createResource(resourceName: String, quantity: Double): Protos.Resource = { Resource.newBuilder() .setName(resourceName) .setType(Value.Type.SCALAR) @@ -114,7 +117,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * @param offerAttributes * @return */ - def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = + protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { offerAttributes.map(attr => { val attrValue = attr.getType match { case Value.Type.SCALAR => attr.getScalar @@ -124,16 +127,18 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } (attr.getName, attrValue) }).toMap + } + /** * Match the requirements (if any) to the offer attributes. * if attribute requirements are not specified - return true - * else if attribute is defined and no values are given, simple attribute presence is preformed + * else if attribute is defined and no values are given, simple attribute presence is performed * else if attribute name and value is specified, subset match is performed on slave attributes */ def matchesAttributeRequirements( slaveOfferConstraints: Map[String, Set[String]], - offerAttributes: Map[String, GeneratedMessage]): Boolean = + offerAttributes: Map[String, GeneratedMessage]): Boolean = { slaveOfferConstraints.forall { // offer has the required attribute and subsumes the required values for that attribute case (name, requiredValues) => @@ -158,6 +163,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { // we succeed if any of them match. requiredValues.contains(textValue.getValue) } + } } /** @@ -206,7 +212,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } } } catch { - case e: Throwable => + case NonFatal(e) => throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e) } } @@ -216,6 +222,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging { private val MEMORY_OVERHEAD_FRACTION = 0.10 private val MEMORY_OVERHEAD_MINIMUM = 384 + /** + * Return the amount of memory to allocate to each executor, taking into account container overheads. + * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value + * @return memory requirement as (0.1 * ) or MEMORY_OVERHEAD_MINIMUM (whichever is larger) + */ def calculateTotalMemory(sc: SparkContext): Int = { sc.conf.getInt("spark.mesos.executor.memoryOverhead", math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) + diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index 56a130d29e747..b354914b6ffd0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -37,22 +37,18 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS test("use at-least minimum overhead") { val f = fixture - // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(512) utils.calculateTotalMemory(f.sc) shouldBe 896 } test("use overhead if it is greater than minimum value") { val f = fixture - // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(4096) utils.calculateTotalMemory(f.sc) shouldBe 4505 } test("use spark.mesos.executor.memoryOverhead (if set)") { val f = fixture - val utils = new MesosSchedulerUtils { } - // 384 > sc.executorMemory * 0.1 => 512 + 384 = 896 when(f.sc.executorMemory).thenReturn(1024) f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") utils.calculateTotalMemory(f.sc) shouldBe 1536 @@ -67,7 +63,6 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS } test("parse an empty constraint string correctly") { - val utils = new MesosSchedulerUtils { } utils.parseConstraintString("") shouldBe Map() } From 902535b1d9a8e5e8b11428bbdde5994c15b83223 Mon Sep 17 00:00:00 2001 From: Ankur Chauhan Date: Wed, 1 Jul 2015 18:15:55 -0700 Subject: [PATCH 24/24] Fix line length --- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 7 ++++--- .../scheduler/cluster/mesos/MesosSchedulerUtils.scala | 6 ++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c7bcbeeac0c89..d72e2af456e15 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -205,13 +205,14 @@ private[spark] class MesosSchedulerBackend( val meetsMemoryRequirements = mem >= calculateTotalMemory(sc) val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK) - val meetsRequirements = (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) || + val meetsRequirements = + (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) // add some debug messaging - val debugStatement = if (meetsRequirements) "Accepting" else "Declining" + val debugstr = if (meetsRequirements) "Accepting" else "Declining" val id = o.getId.getValue - logDebug(s"$debugStatement offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") + logDebug(s"$debugstr offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") meetsRequirements } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index a06b69e22f5da..d8a8c848bb4d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -223,9 +223,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging { private val MEMORY_OVERHEAD_MINIMUM = 384 /** - * Return the amount of memory to allocate to each executor, taking into account container overheads. + * Return the amount of memory to allocate to each executor, taking into account + * container overheads. * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value - * @return memory requirement as (0.1 * ) or MEMORY_OVERHEAD_MINIMUM (whichever is larger) + * @return memory requirement as (0.1 * ) or MEMORY_OVERHEAD_MINIMUM + * (whichever is larger) */ def calculateTotalMemory(sc: SparkContext): Int = { sc.conf.getInt("spark.mesos.executor.memoryOverhead",