Skip to content
10 changes: 10 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,16 @@ See the [configuration page](configuration.html) for information on Spark config
By default Mesos agents will not pull images they already have cached.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.parameters</code></td>
<td>(none)</td>
<td>
Set the list of custom parameters which will be passed into the <code>docker run</code> command when launching the Spark executor on Mesos using the docker containerizer. The format of this property is a comma-separated list of
key/value pairs. Example:

<pre>key1=val1,key2=val2,key3=val3</pre>
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.docker.volumes</code></td>
<td>(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume}
import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume}
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}

import org.apache.spark.{SparkConf, SparkException}
Expand Down Expand Up @@ -99,6 +99,26 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.toList
}

/**
* Parse a list of docker parameters, each of which
* takes the form key=value
*/
private def parseParamsSpec(params: String): List[Parameter] = {
params.split(",").map(_.split("=")).flatMap { spec: Array[String] =>

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/spec/parameter, for consistency

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be consistent with other methods, i think we should stick with spec.

def parsePortMappingsSpec(portmaps: String): List[DockerInfo.PortMapping] = {
portmaps.split(",").map(_.split(":")).flatMap { spec: Array[String] =>
val portmap: DockerInfo.PortMapping.Builder = DockerInfo.PortMapping

it seems to be better to reserve the name parameter later for builder

val param: Parameter.Builder = Parameter.newBuilder()

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not very familiar with docker parameters - but wont this method not fail if '=' exists in the value ?
Or is that prohibited by docker ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm if a value contains a '=' we will have a parsing error.

@skonto skonto Apr 12, 2017

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

= in values seems valid for env vars: moby/moby#12763

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, so we should split with a limit instead.
@yanji84 can you fix this?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry missed out this comment earlier, Ii set limit to 2, pushed the new pull request

val param: Parameter.Builder = Parameter.newBuilder()
spec match {
case Array(key, value) =>
Some(param.setKey(key).setValue(value))
case spec =>
logWarning(s"Unable to parse arbitary parameters: $params. "
+ "Expected form: \"key=value(, ...)\"")
None
}
}
.map { _.build() }
.toList
}

def containerInfo(conf: SparkConf): ContainerInfo = {
val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
conf.get("spark.mesos.containerizer", "docker") == "docker") {
Expand All @@ -120,8 +140,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.map(parsePortMappingsSpec)
.getOrElse(List.empty)

val params = conf
.getOption("spark.mesos.executor.docker.parameters")
.map(parseParamsSpec)
.getOrElse(List.empty)

if (containerType == ContainerInfo.Type.DOCKER) {
containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps))
containerInfo
.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
} else {
containerInfo.setMesos(mesosInfo(image, forcePullImage))
}
Expand All @@ -144,11 +170,13 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
private def dockerInfo(
image: String,
forcePullImage: Boolean,
portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = {
portMaps: List[ContainerInfo.DockerInfo.PortMapping],
params: List[Parameter]): DockerInfo = {
val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
portMaps.foreach(dockerBuilder.addPortMappings(_))
params.foreach(dockerBuilder.addParameters(_))

dockerBuilder.build
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler.cluster.mesos

import org.scalatest._
import org.scalatest.mock.MockitoSugar

import org.apache.spark.{SparkConf, SparkFunSuite}

class MesosSchedulerBackendUtilSuite extends SparkFunSuite with Matchers with MockitoSugar {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is Matchers used anymore?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll check, if not, will remove it


test("ContainerInfo fails to parse invalid docker parameters") {
val conf = new SparkConf()
conf.set("spark.mesos.executor.docker.parameters", "a,b")
conf.set("spark.mesos.executor.docker.image", "test")

val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
val params = containerInfo.getDocker.getParametersList

assert(params.size() == 0)
}

test("ContainerInfo parses docker parameters") {
val conf = new SparkConf()
conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3")
conf.set("spark.mesos.executor.docker.image", "test")

val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
val params = containerInfo.getDocker.getParametersList
assert(params.size() == 3)
assert(params.get(0).getKey == "a")
assert(params.get(0).getValue == "1")
assert(params.get(1).getKey == "b")
assert(params.get(1).getValue == "2")
assert(params.get(2).getKey == "c")
assert(params.get(2).getValue == "3")
}
}