diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 05fda0fded7f8..87760ab8f0b0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -142,6 +142,8 @@ private[spark] class MesosClusterScheduler( private val queuedDriversState = engineFactory.createEngine("driverQueue") private val launchedDriversState = engineFactory.createEngine("launchedDrivers") private val pendingRetryDriversState = engineFactory.createEngine("retryList") + private val driverOfferConstraints = + parseConstraintString(conf.get("spark.mesos.constraints", "")) // Flag to mark if the scheduler is ready to be called, which is until the scheduler // is registered with Mesos master. @volatile protected var ready = false @@ -510,6 +512,22 @@ private[spark] class MesosClusterScheduler( } override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { + + // Filter by mesos constraints + stateLock.synchronized { + val it = offers.iterator() + while (it.hasNext) { + val offer = it.next() + val offerAttributes = toAttributeMap(offer.getAttributesList) + val meetsConstraints = matchesAttributeRequirements(driverOfferConstraints, offerAttributes) + + if (!meetsConstraints) { + driver.declineOffer(offer.getId) + it.remove() + } + } + } + val currentOffers = offers.asScala.map(o => new ResourceOffer( o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))