Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,15 @@ See the [configuration page](configuration.html) for information on Spark config
<pre>[host_path:]container_path[:ro|:rw]</pre>
</td>
</tr>
<tr>
<td><code>spark.mesos.task.labels</code></td>
<td>(none)</td>
<td>
Set the mesos labels to add to each task. Labels are free-form key-value pairs.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

capitalize Mesos.

Key-value pairs should be separated by a colon, and commas used to list more than one.
Ex. key:value,key2:value2.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.home</code></td>
<td>driver side <code>SPARK_HOME</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)

private val taskLabels = conf.get("spark.mesos.task.labels", "").toString

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is toString redundant here or I miss something?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, that is redundant, i'll drop that toString


private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
Expand Down Expand Up @@ -408,6 +410,22 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskBuilder.addAllResources(resourcesToUse.asJava)
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))

val labelsBuilder = taskBuilder.getLabelsBuilder

taskLabels.split(",").foreach(label => {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a separate method for this.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

label.split(":") match {
case Array(key, value) =>
labelsBuilder.addLabels(Label.newBuilder()
.setKey(key)
.setValue(value)
.build())
case _ =>
logWarning(s"Unable to parse $label into a key:value label for the task.")
}
})

taskBuilder.setLabels(labelsBuilder)

tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,52 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0")
}

test("mesos sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test"
setBackend(Map(
"spark.mesos.task.labels" -> taskLabelsString
))

// Build up the labels
val taskLabels = Protos.Labels.newBuilder()
.addLabels(Protos.Label.newBuilder()
.setKey("mesos").setValue("test").build())
.addLabels(Protos.Label.newBuilder()
.setKey("label").setValue("test").build())
.build()

val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")

val labels = launchedTasks.head.getLabels

assert(launchedTasks.head.getLabels.equals(taskLabels))
}

test("mesos ignored invalid labels and sets configurable labels on tasks") {
val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
setBackend(Map(
"spark.mesos.task.labels" -> taskLabelsString
))

// Build up the labels
val taskLabels = Protos.Labels.newBuilder()
.addLabels(Protos.Label.newBuilder()
.setKey("mesos").setValue("test").build())
.addLabels(Protos.Label.newBuilder()
.setKey("label").setValue("test").build())
.build()

val offers = List(Resources(backend.executorMemory(sc), 1))
offerResources(offers)
val launchedTasks = verifyTaskLaunched(driver, "o1")

val labels = launchedTasks.head.getLabels

assert(launchedTasks.head.getLabels.equals(taskLabels))
}

test("mesos supports spark.mesos.network.name") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"
Expand Down