Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,8 @@ private[spark] class MesosClusterScheduler(
private class ResourceOffer(
val offerId: OfferID,
val slaveId: SlaveID,
var resources: JList[Resource]) {
var resources: JList[Resource],
var attributes: JList[Attribute]) {
override def toString(): String = {
s"Offer id: ${offerId}, resources: ${resources}"
}
Expand All @@ -491,8 +492,14 @@ private[spark] class MesosClusterScheduler(
for (submission <- candidates) {
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val submissionConstraints = parseConstraintString(
submission.schedulerProperties.get("spark.mesos.constraints").getOrElse(""))
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " +
"constraints: $submissionConstraints")
val offerOption = currentOffers.find { o =>
val offerAttributes = toAttributeMap(o.attributes);
val meetsConstraints = matchesAttributeRequirements(submissionConstraints, offerAttributes);
meetsConstraints &&
getResource(o.resources, "cpus") >= driverCpu &&
getResource(o.resources, "mem") >= driverMem
}
Expand Down Expand Up @@ -547,7 +554,7 @@ private[spark] class MesosClusterScheduler(
val currentTime = new Date()

val currentOffers = offers.asScala.map {
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList, o.getAttributesList)
}.toList

stateLock.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.Scalar
import org.apache.mesos.Protos.Value.Text
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Matchers._
import org.mockito.Mockito._
Expand Down Expand Up @@ -252,6 +253,36 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}

test("can accept offers meeting attribute requirements") {
setBackend(Map("spark.mesos.constraints" -> "zone:us-central1-a"))

val minMem = backend.executorMemory(sc)
val minCpu = 4

val mesosOffers = new java.util.ArrayList[Offer]
val offer = createOffer("o1", "s1", minMem, minCpu,
Map("zone" -> "us-central1-a"))
mesosOffers.add(offer)
backend.resourceOffers(driver, mesosOffers)

verifyTaskLaunched("o1")
}

test("cannot accept offers not meeting attribute requirements") {
setBackend(Map("spark.mesos.constraints" -> "zone:us-central1-a"))

val minMem = backend.executorMemory(sc)
val minCpu = 4

val mesosOffers = new java.util.ArrayList[Offer]
val offer = createOffer("o1", "s1", minMem, minCpu,
Map("zone" -> "us-central1-b"))
mesosOffers.add(offer)
backend.resourceOffers(driver, mesosOffers)

verifyDeclinedOffer(driver, createOfferId("o1"), true)
}

private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
Expand Down Expand Up @@ -302,7 +333,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
TaskID.newBuilder().setValue(taskId).build()
}

private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int,
attributes: Map[String, String] = null): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
Expand All @@ -317,7 +349,15 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
.setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(slaveId))
.setHostname(s"host${slaveId}")
.build()
if (attributes != null) {
for (attr <- attributes) {
builder.addAttributesBuilder()
.setName(attr._1)
.setType(Value.Type.TEXT)
.setText(Text.newBuilder().setValue(attr._2))
}
}
builder.build()
}

private def createSchedulerBackend(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
import org.apache.mesos.Protos.Value.Scalar
import org.apache.mesos.Protos.Value.Text
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Matchers._
import org.mockito.Mockito._
Expand Down Expand Up @@ -380,4 +381,158 @@ class MesosFineGrainedSchedulerBackendSuite
r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
})
}

test("can accept offers meeting attribute requirements") {
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]
val conf = new SparkConf()
.set("spark.mesos.constraints", "zone:us-central1-a")

val listenerBus = mock[LiveListenerBus]
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))

val sc = mock[SparkContext]
when(sc.executorMemory).thenReturn(100)
when(sc.getSparkHome()).thenReturn(Option("/path"))
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
when(sc.conf).thenReturn(conf)
when(sc.listenerBus).thenReturn(listenerBus)

val id = 1
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(500))
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(4))
builder.addAttributesBuilder()
.setName("zone")
.setType(Value.Type.TEXT)
.setText(Text.newBuilder().setValue("us-central1-a"))
val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
.setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
.setHostname(s"host${id.toString}").build()

val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(offer)

val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")

val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
3 // Deducting 1 for executor
))

val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1)

val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
when(
driver.launchTasks(
Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
capture.capture(),
any(classOf[Filters])
)
).thenReturn(Status.valueOf(1))

backend.resourceOffers(driver, mesosOffers)

verify(driver, times(1)).launchTasks(
Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
capture.capture(),
any(classOf[Filters])
)

assert(capture.getValue.size() === 1)
val taskInfo = capture.getValue.iterator().next()
assert(taskInfo.getName.equals("n1"))
assert(taskInfo.getResourcesCount === 1)
val cpusDev = taskInfo.getResourcesList.get(0)
assert(cpusDev.getName.equals("cpus"))
assert(cpusDev.getScalar.getValue.equals(1.0))
val executorResources = taskInfo.getExecutor.getResourcesList.asScala
assert(executorResources.exists { r =>
r.getName.equals("mem") && r.getScalar.getValue.equals(484.0)
})
assert(executorResources.exists { r =>
r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0)
})
}

test("cannot accept offers not meeting attribute requirements") {
val driver = mock[SchedulerDriver]
val taskScheduler = mock[TaskSchedulerImpl]
val conf = new SparkConf()
.set("spark.mesos.constraints", "zone:us-central1-a")

val listenerBus = mock[LiveListenerBus]
listenerBus.post(
SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))

val sc = mock[SparkContext]
when(sc.executorMemory).thenReturn(100)
when(sc.getSparkHome()).thenReturn(Option("/path"))
when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
when(sc.conf).thenReturn(conf)
when(sc.listenerBus).thenReturn(listenerBus)

val id = 1
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(500))
builder.addResourcesBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(4))
builder.addAttributesBuilder()
.setName("zone")
.setType(Value.Type.TEXT)
.setText(Text.newBuilder().setValue("us-central1-b"))
val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
.setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
.setHostname(s"host${id.toString}").build()

val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(offer)

val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")

val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](0)

when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq()))
when(taskScheduler.CPUS_PER_TASK).thenReturn(1)

val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
when(
driver.launchTasks(
Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
capture.capture(),
any(classOf[Filters])
)
).thenReturn(Status.valueOf(1))
when(driver.declineOffer(mesosOffers.get(0).getId)).thenReturn(Status.valueOf(1))

backend.resourceOffers(driver, mesosOffers)

verify(driver, never()).launchTasks(
Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
capture.capture(),
any(classOf[Filters])
)
verify(driver, times(1)).declineOffer(
Matchers.eq(mesosOffers.get(0).getId),
any(classOf[Filters])
)
}
}