Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ private[spark] class MesosClusterScheduler(
private val queuedDriversState = engineFactory.createEngine("driverQueue")
private val launchedDriversState = engineFactory.createEngine("launchedDrivers")
private val pendingRetryDriversState = engineFactory.createEngine("retryList")
private val driverOfferConstraints =
parseConstraintString(conf.get("spark.mesos.constraints", ""))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the problem: This setting is read only once, when the dispatcher is started. What you want is to pick up the constraints set on the submitted job. Have a look at how other job-specific settings are treated in the code (there's a submitProperties variable, or something similar).

// Flag to mark if the scheduler is ready to be called, which is until the scheduler
// is registered with Mesos master.
@volatile protected var ready = false
Expand Down Expand Up @@ -510,6 +512,22 @@ private[spark] class MesosClusterScheduler(
}

override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {

// Filter by mesos constraints
stateLock.synchronized {
val it = offers.iterator()
while (it.hasNext) {
val offer = it.next()
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(driverOfferConstraints, offerAttributes)

if (!meetsConstraints) {
driver.declineOffer(offer.getId)
it.remove()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not mutate the underlying collection. Anyhow, this has no effect because the code below is using currentOffers, which is a copy of the original resources made earlier. I suggest you insert this filter at the earlier point a few lines above, where currentOffers is created.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, it's true. I modify code base in 1.5.2 previously, and test ok in practice. in 1.6.0 version the code is change. I will revise it soon.thanks for your review.

}
}
}

val currentOffers = offers.asScala.map(o =>
new ResourceOffer(
o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem"))
Expand Down