Skip to content

Commit 825afa0

Browse files
Luc Bourliertnachen
authored andcommitted
Supports more spark-submit parameters
1 parent b8e7181 commit 825afa0

File tree

2 files changed

+28
-21
lines changed

2 files changed

+28
-21
lines changed

core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.deploy
1919

20-
private[deploy] class DriverDescription(
20+
private[spark] case class DriverDescription(
2121
val jarUrl: String,
2222
val mem: Int,
2323
val cores: Int,

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,6 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
164164
""
165165
}
166166

167-
// TODO: add support for more spark-submit parameters
168-
169167
val envBuilder = Environment.newBuilder()
170168
desc.command.environment.foreach {
171169
case (k, v) =>
@@ -175,40 +173,49 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
175173

176174
builder.setEnvironment(envBuilder.build())
177175

176+
val cmdOptions = generateCmdOption(req)
177+
178178
val executorUri = req.conf.getOption("spark.executor.uri")
179-
if (executorUri.isDefined) {
179+
val cmd = if (executorUri.isDefined) {
180180
builder.addUris(CommandInfo.URI.newBuilder().setValue(executorUri.get).build())
181181

182-
val basename = executorUri.get.split('/').last.split('.').head
183-
val cmd =
184-
Seq("bin/spark-submit",
185-
"--class", desc.command.mainClass,
186-
"--master", s"mesos://${conf.get("spark.master")}",
187-
s"../${desc.jarUrl.split("/").last}")
188-
.mkString(" ")
182+
val folderBasename = executorUri.get.split('/').last.split('.').head
183+
184+
val cmdExecutable = s"cd $folderBasename*; $prefixEnv bin/spark-submit"
185+
186+
val cmdJar = s"../${desc.jarUrl.split("/").last}"
189187

190-
builder.setValue(
191-
s"cd $basename*; $prefixEnv $cmd")
188+
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar"
192189
} else {
193190
val executorSparkHome = req.conf.getOption("spark.mesos.executor.home")
194191
.getOrElse {
195192
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
196193
}
197194

198-
val cmd =
199-
Seq(new File(executorSparkHome, "./bin/spark-submit"),
200-
"--class", desc.command.mainClass,
201-
"--master", s"mesos://${conf.get("spark.master")}",
202-
desc.jarUrl.split("/").last)
203-
.mkString(" ")
195+
val cmdExecutable = new File(executorSparkHome, "./bin/spark-submit").getCanonicalPath
204196

205-
builder.setValue(
206-
s"$prefixEnv $cmd")
197+
val cmdJar = desc.jarUrl.split("/").last
198+
199+
s"$cmdExecutable ${cmdOptions.mkString(" ")} $cmdJar"
207200
}
208201

202+
builder.setValue(cmd)
203+
209204
builder.build
210205
}
211206

207+
private def generateCmdOption(req: DriverRequest): Seq[String] = {
208+
Seq(
209+
"--name", req.conf.get("spark.app.name"),
210+
"--class", req.desc.command.mainClass,
211+
"--master", s"mesos://${conf.get("spark.master")}",
212+
"--driver-cores", req.desc.cores.toString,
213+
"--driver-memory", s"${req.desc.mem}M",
214+
"--executor-memory", req.conf.get("spark.executor.memory"),
215+
"--total-executor-cores", req.conf.get("spark.cores.max")
216+
)
217+
}
218+
212219
override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = {
213220
// We should try to schedule all the drivers if the offers fit.
214221

0 commit comments

Comments
 (0)