-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-19740][MESOS]Add support in Spark to pass arbitrary parameters into docker when running on mesos with docker containerizer #17109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
4f8368e
bba57f9
ae30e23
ecb7a8e
4087936
54f9ec8
423dfa8
0696d4f
03e89eb
3417721
737acf0
cbb784a
2f3f8b2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -356,6 +356,16 @@ See the [configuration page](configuration.html) for information on Spark config | |
| By default Mesos agents will not pull images they already have cached. | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.mesos.executor.docker.params</code></td> | ||
| <td>(none)</td> | ||
| <td> | ||
| Set the list of custom parameters which will be passed into the <code>docker run</code> command when launching the Spark executor on mesos using docker containerizer. The format of this property is a comma-separated list of | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/mesos/Mesos s/docker containerizer/the docker containerizer |
||
| key/value pairs. That is they take the form: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/That is they take the form:/Example: |
||
|
|
||
| <pre>key1=val1,key2=val2,key3=val3</pre> | ||
| </td> | ||
| </tr> | ||
| <tr> | ||
| <td><code>spark.mesos.executor.docker.volumes</code></td> | ||
| <td>(none)</td> | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.scheduler.cluster.mesos | ||
|
|
||
| import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume} | ||
| import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume} | ||
| import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo} | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkException} | ||
|
|
@@ -99,6 +99,26 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { | |
| .toList | ||
| } | ||
|
|
||
| /** | ||
| * Parse a comma-delimited list of arbitrary parameters, each of which | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "list of docker parameters" |
||
| * takes the form key=value | ||
| */ | ||
| def parseParamsSpec(params: String): List[Parameter] = { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. private |
||
| params.split(",").map(_.split("=")).flatMap { kv: Array[String] => | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. s/kv/parameter for naming consistency |
||
| val param: Parameter.Builder = Parameter.newBuilder() | ||
| kv match { | ||
| case Array(key, value) => | ||
| Some(param.setKey(key).setValue(value)) | ||
| case kv => | ||
| logWarning(s"Unable to parse arbitary parameters: $params. " | ||
| + "Expected form: \"key=value(, ...)\"") | ||
| None | ||
| } | ||
| } | ||
| .map { _.build() } | ||
| .toList | ||
| } | ||
|
|
||
| def containerInfo(conf: SparkConf): ContainerInfo = { | ||
| val containerType = if (conf.contains("spark.mesos.executor.docker.image") && | ||
| conf.get("spark.mesos.containerizer", "docker") == "docker") { | ||
|
|
@@ -120,8 +140,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { | |
| .map(parsePortMappingsSpec) | ||
| .getOrElse(List.empty) | ||
|
|
||
| val params = conf | ||
| .getOption("spark.mesos.executor.docker.params") | ||
| .map(parseParamsSpec) | ||
| .getOrElse(List.empty) | ||
|
|
||
| if (containerType == ContainerInfo.Type.DOCKER) { | ||
| containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps)) | ||
| containerInfo | ||
| .setDocker(dockerInfo(image, forcePullImage, portMaps, params)) | ||
| } else { | ||
| containerInfo.setMesos(mesosInfo(image, forcePullImage)) | ||
| } | ||
|
|
@@ -144,11 +170,13 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { | |
| private def dockerInfo( | ||
| image: String, | ||
| forcePullImage: Boolean, | ||
| portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = { | ||
| portMaps: List[ContainerInfo.DockerInfo.PortMapping], | ||
| params: List[Parameter]): DockerInfo = { | ||
| val dockerBuilder = ContainerInfo.DockerInfo.newBuilder() | ||
| .setImage(image) | ||
| .setForcePullImage(forcePullImage) | ||
| portMaps.foreach(dockerBuilder.addPortMappings(_)) | ||
| params.foreach(dockerBuilder.addParameters(_)) | ||
|
|
||
| dockerBuilder.build | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.scheduler.cluster.mesos | ||
|
|
||
| import org.scalatest._ | ||
| import org.scalatest.mock.MockitoSugar | ||
|
|
||
| import org.apache.spark.{SparkConf, SparkFunSuite} | ||
|
|
||
| class MesosSchedulerBackendUtilSuite extends SparkFunSuite with Matchers with MockitoSugar { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i'll check, if not, will remove it |
||
|
|
||
| test("Parse arbitrary parameter to pass into docker containerizer") { | ||
| val parsed = MesosSchedulerBackendUtil.parseParamsSpec("a=1,b=2,c=3") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be a private method, so let's not test it. Just test |
||
| parsed(0).getKey shouldBe "a" | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. check out the other test suites. We use |
||
| parsed(0).getValue shouldBe "1" | ||
| parsed(1).getKey shouldBe "b" | ||
| parsed(1).getValue shouldBe "2" | ||
| parsed(2).getKey shouldBe "c" | ||
| parsed(2).getValue shouldBe "3" | ||
|
|
||
| val invalid = MesosSchedulerBackendUtil.parseParamsSpec("a,b") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. invalid entries should be in a separate test |
||
| invalid.length shouldBe 0 | ||
| } | ||
|
|
||
| test("ContainerInfo contains parsed arbitrary parameters") { | ||
| val conf = new SparkConf() | ||
| conf.set("spark.mesos.executor.docker.params", "a=1,b=2,c=3") | ||
| conf.set("spark.mesos.executor.docker.image", "test") | ||
|
|
||
| val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf) | ||
| val params = containerInfo.getDocker.getParametersList | ||
| params.size() shouldBe 3 | ||
| params.get(0).getKey shouldBe "a" | ||
| params.get(0).getValue shouldBe "1" | ||
| params.get(1).getKey shouldBe "b" | ||
| params.get(1).getValue shouldBe "2" | ||
| params.get(2).getKey shouldBe "c" | ||
| params.get(2).getValue shouldBe "3" | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/params/parameters
to be consistent with the Mesos protobuf terminology