diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 73bd4c58e16f..d00004fe6be9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -472,7 +472,8 @@ private[spark] class MesosClusterScheduler( private class ResourceOffer( val offerId: OfferID, val slaveId: SlaveID, - var resources: JList[Resource]) { + var resources: JList[Resource], + var attributes: JList[Attribute]) { override def toString(): String = { s"Offer id: ${offerId}, resources: ${resources}" } @@ -491,8 +492,14 @@ private[spark] class MesosClusterScheduler( for (submission <- candidates) { val driverCpu = submission.cores val driverMem = submission.mem - logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem") + val submissionConstraints = parseConstraintString( + submission.schedulerProperties.get("spark.mesos.constraints").getOrElse("")) + logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " + + "constraints: $submissionConstraints") val offerOption = currentOffers.find { o => + val offerAttributes = toAttributeMap(o.attributes); + val meetsConstraints = matchesAttributeRequirements(submissionConstraints, offerAttributes); + meetsConstraints && getResource(o.resources, "cpus") >= driverCpu && getResource(o.resources, "mem") >= driverMem } @@ -547,7 +554,7 @@ private[spark] class MesosClusterScheduler( val currentTime = new Date() val currentOffers = offers.asScala.map { - o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList) + o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList, o.getAttributesList) }.toList stateLock.synchronized { diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 7f21d4c623af..a23f04ac888f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos._ import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.Value.Text import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -252,6 +253,36 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("can accept offers meeting attribute requirements") { + setBackend(Map("spark.mesos.constraints" -> "zone:us-central1-a")) + + val minMem = backend.executorMemory(sc) + val minCpu = 4 + + val mesosOffers = new java.util.ArrayList[Offer] + val offer = createOffer("o1", "s1", minMem, minCpu, + Map("zone" -> "us-central1-a")) + mesosOffers.add(offer) + backend.resourceOffers(driver, mesosOffers) + + verifyTaskLaunched("o1") + } + + test("cannot accept offers not meeting attribute requirements") { + setBackend(Map("spark.mesos.constraints" -> "zone:us-central1-a")) + + val minMem = backend.executorMemory(sc) + val minCpu = 4 + + val mesosOffers = new java.util.ArrayList[Offer] + val offer = createOffer("o1", "s1", minMem, minCpu, + Map("zone" -> "us-central1-b")) + mesosOffers.add(offer) + backend.resourceOffers(driver, mesosOffers) + + verifyDeclinedOffer(driver, createOfferId("o1"), true) + } + private def verifyDeclinedOffer(driver: SchedulerDriver, offerId: OfferID, filter: Boolean = false): Unit = { @@ -302,7 +333,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite TaskID.newBuilder().setValue(taskId).build() } - private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { + private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int, + attributes: Map[String, String] = null): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -317,7 +349,15 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite .setValue("f1")) .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) .setHostname(s"host${slaveId}") - .build() + if (attributes != null) { + for (attr <- attributes) { + builder.addAttributesBuilder() + .setName(attr._1) + .setType(Value.Type.TEXT) + .setText(Text.newBuilder().setValue(attr._2)) + } + } + builder.build() } private def createSchedulerBackend( diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala index 41693b1191a3..97ef070cb303 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -29,6 +29,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos._ import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.Value.Text import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -380,4 +381,158 @@ class MesosFineGrainedSchedulerBackendSuite r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") }) } + + test("can accept offers meeting attribute requirements") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + val conf = new SparkConf() + .set("spark.mesos.constraints", "zone:us-central1-a") + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(conf) + when(sc.listenerBus).thenReturn(listenerBus) + + val id = 1 + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(500)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(4)) + builder.addAttributesBuilder() + .setName("zone") + .setType(Value.Type.TEXT) + .setText(Text.newBuilder().setValue("us-central1-a")) + val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) + .setHostname(s"host${id.toString}").build() + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(offer) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + 3 // Deducting 1 for executor + )) + + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(1) + + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + ).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + + assert(capture.getValue.size() === 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName.equals("n1")) + assert(taskInfo.getResourcesCount === 1) + val cpusDev = taskInfo.getResourcesList.get(0) + assert(cpusDev.getName.equals("cpus")) + assert(cpusDev.getScalar.getValue.equals(1.0)) + val executorResources = taskInfo.getExecutor.getResourcesList.asScala + assert(executorResources.exists { r => + r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) + }) + assert(executorResources.exists { r => + r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) + }) + } + + test("cannot accept offers not meeting attribute requirements") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + val conf = new SparkConf() + .set("spark.mesos.constraints", "zone:us-central1-a") + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(conf) + when(sc.listenerBus).thenReturn(listenerBus) + + val id = 1 + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(500)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(4)) + builder.addAttributesBuilder() + .setName("zone") + .setType(Value.Type.TEXT) + .setText(Text.newBuilder().setValue("us-central1-b")) + val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) + .setHostname(s"host${id.toString}").build() + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(offer) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](0) + + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq())) + when(taskScheduler.CPUS_PER_TASK).thenReturn(1) + + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + ).thenReturn(Status.valueOf(1)) + when(driver.declineOffer(mesosOffers.get(0).getId)).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + verify(driver, never()).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + verify(driver, times(1)).declineOffer( + Matchers.eq(mesosOffers.get(0).getId), + any(classOf[Filters]) + ) + } }