diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index ae3855084a650..0e5a20c578db3 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -537,6 +537,20 @@ See the [configuration page](configuration.html) for information on Spark config
for more details.
+
+ spark.mesos.network.labels |
+ (none) |
+
+ Pass network labels to CNI plugins. This is a comma-separated list
+ of key-value pairs, where each key-value pair has the format key:value.
+ Example:
+
+ key1:val1,key2:val2
+ See
+ the Mesos CNI docs
+ for more details.
+ |
+
spark.mesos.fetcherCache.enable |
false |
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
index 6c8619e3c3c13..a5015b9243316 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala
@@ -56,7 +56,7 @@ package object config {
.stringConf
.createOptional
- private [spark] val DRIVER_LABELS =
+ private[spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
@@ -64,10 +64,25 @@ package object config {
.stringConf
.createOptional
- private [spark] val DRIVER_FAILOVER_TIMEOUT =
+ private[spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)
+
+ private[spark] val NETWORK_NAME =
+ ConfigBuilder("spark.mesos.network.name")
+ .doc("Attach containers to the given named network. If this job is launched " +
+ "in cluster mode, also launch the driver in the given named network.")
+ .stringConf
+ .createOptional
+
+ private[spark] val NETWORK_LABELS =
+ ConfigBuilder("spark.mesos.network.labels")
+ .doc("Network labels to pass to CNI plugins. This is a comma-separated list " +
+ "of key-value pairs, where each key-value pair has the format key:value. " +
+ "Example: key1:val1,key2:val2")
+ .stringConf
+ .createOptional
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e6b09572121d6..b2daeaa8d2141 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -668,7 +668,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
private def executorHostname(offer: Offer): String = {
- if (sc.conf.getOption("spark.mesos.network.name").isDefined) {
+ if (sc.conf.get(NETWORK_NAME).isDefined) {
// The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
"0.0.0.0"
} else {
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index fbcbc55099ec5..e5c1e801f2772 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -21,6 +21,7 @@ import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Vo
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME}
import org.apache.spark.internal.Logging
/**
@@ -161,8 +162,12 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
volumes.foreach(_.foreach(containerInfo.addVolumes(_)))
}
- conf.getOption("spark.mesos.network.name").map { name =>
- val info = NetworkInfo.newBuilder().setName(name).build()
+ conf.get(NETWORK_NAME).map { name =>
+ val networkLabels = MesosProtoUtils.mesosLabels(conf.get(NETWORK_LABELS).getOrElse(""))
+ val info = NetworkInfo.newBuilder()
+ .setName(name)
+ .setLabels(networkLabels)
+ .build()
containerInfo.addNetworkInfos(info)
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 0bb47906347d5..50bb501071509 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -222,7 +222,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
}
- test("supports spark.mesos.network.name") {
+ test("supports spark.mesos.network.name and spark.mesos.network.labels") {
setScheduler()
val mem = 1000
@@ -233,7 +233,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
- "spark.mesos.network.name" -> "test-network-name"),
+ "spark.mesos.network.name" -> "test-network-name",
+ "spark.mesos.network.labels" -> "key1:val1,key2:val2"),
"s1",
new Date()))
@@ -246,6 +247,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
+ assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
+ assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
+ assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
+ assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}
test("supports spark.mesos.driver.labels") {
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index d9ff4a403ea36..605ed07b641a8 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -568,9 +568,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getLabels.equals(taskLabels))
}
- test("mesos supports spark.mesos.network.name") {
+ test("mesos supports spark.mesos.network.name and spark.mesos.network.labels") {
setBackend(Map(
- "spark.mesos.network.name" -> "test-network-name"
+ "spark.mesos.network.name" -> "test-network-name",
+ "spark.mesos.network.labels" -> "key1:val1,key2:val2"
))
val (mem, cpu) = (backend.executorMemory(sc), 4)
@@ -582,6 +583,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
+ assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
+ assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
+ assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
+ assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}
test("supports spark.scheduler.minRegisteredResourcesRatio") {