diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index f0172504c55a..e678143a570c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,19 +18,21 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{List => JList} -import java.util.Collections +import java.util.{Collections, List => JList} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} -import org.apache.mesos.{Scheduler => MScheduler} -import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} +import org.apache.mesos.{Scheduler => MScheduler, _} -import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.worker.CommandUtils +import org.apache.spark.executor.CoarseGrainedExecutorBackend import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend +import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -111,16 +113,16 @@ private[spark] class CoarseMesosSchedulerBackend( def createCommand(offer: Offer, numCores: Int): CommandInfo = { val environment = Environment.newBuilder() - val extraClassPath = conf.getOption("spark.executor.extraClassPath") - extraClassPath.foreach { cp => - environment.addVariables( - Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build()) + val classPathEntries = conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => + cp.split(File.pathSeparator) } val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions") + .map(Utils.splitCommandString).getOrElse(Seq.empty) - val libraryPathOption = "spark.executor.extraLibraryPath" - val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p") - val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ") + val libraryPathEntries = + conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { lp => + lp.split(File.pathSeparator) + } sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() @@ -136,21 +138,28 @@ private[spark] class CoarseMesosSchedulerBackend( conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) + val mesosCommand = Command( + classOf[CoarseGrainedExecutorBackend].getCanonicalName, + Seq(driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores.toString), + sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts) + val uri = conf.get("spark.executor.uri", null) if (uri == null) { - val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath - command.setValue( - "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format( - runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) + command.setValue(CommandUtils.buildCommandSeq( + mesosCommand, sc.executorMemory, sparkHome).mkString("\"", "\" \"", "\"")) } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". + // Grab everything to the first '.'. We'll use that and '*' to glob the directory "correctly". + // For example, let the URI be: + // + // hdfs://localhost:9000/tmp/mesos/spark-1.1.0-bin-hadoop2.tgz + // + // then "basename" is "spark-1". When the Mesos executor is started, the working directory is + // set to the root directory of the sandbox (one level up to the directory uncompressed from + // the Spark distribution tarball), so "cd spark-1*" brings us to the correct executor side + // Spark home. val basename = uri.split('/').last.split('.').head - command.setValue( - ("cd %s*; " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d") - .format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores)) + command.setValue(s"cd $basename*; " + CommandUtils.buildCommandSeq( + mesosCommand, sc.executorMemory, ".").mkString("\"", "\" \"", "\"")) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } command.build() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index c717e7c621a8..fd7d1edcbd0a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -18,20 +18,21 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections +import java.util.{Collections, ArrayList => JArrayList, List => JList} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.mesos.protobuf.ByteString -import org.apache.mesos.{Scheduler => MScheduler} -import org.apache.mesos._ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _} +import org.apache.mesos.protobuf.ByteString +import org.apache.mesos.{Scheduler => MScheduler, _} -import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.worker.CommandUtils +import org.apache.spark.executor.MesosExecutorBackend import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer} import org.apache.spark.util.Utils +import org.apache.spark.{Logging, SparkContext, SparkException, TaskState} /** * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a @@ -85,27 +86,55 @@ private[spark] class MesosSchedulerBackend( } } + // TODO Extract common code from this method and CoarseMesosSchedulerBackend.createCommand def createExecutorInfo(execId: String): ExecutorInfo = { val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) val environment = Environment.newBuilder() + val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp => + cp.split(File.pathSeparator) + } + val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions") + .map(Utils.splitCommandString).getOrElse(Seq.empty) + + val libraryPathEntries = + sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { lp => + lp.split(File.pathSeparator) + } sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() .setName(key) .setValue(value) .build()) } + val mesosCommand = Command( + classOf[MesosExecutorBackend].getCanonicalName, + Seq.empty, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts) val command = CommandInfo.newBuilder() .setEnvironment(environment) + // We have to use "sbin/mesos-pyenv.sh" to setup the PYTHONPATH environment variable instead of + // setting it from the Scala code here. Because there's no way to figure out Mesos executor side + // Spark home in advance on driver side. val uri = sc.conf.get("spark.executor.uri", null) if (uri == null) { - command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath) + val pyEnv = new File(sparkHome, "sbin/mesos-pyenv.sh").getCanonicalPath + command.setValue(s"$pyEnv; " + CommandUtils.buildCommandSeq( + mesosCommand, sc.executorMemory, sparkHome).mkString("\"", "\" \"", "\"")) } else { - // Grab everything to the first '.'. We'll use that and '*' to - // glob the directory "correctly". + // Grab everything to the first '.'. We'll use that and '*' to glob the directory "correctly". + // For example, let the URI be: + // + // hdfs://localhost:9000/tmp/mesos/spark-1.1.0-bin-hadoop2.tgz + // + // then "basename" is "spark-1". When the Mesos executor is started, the working directory is + // set to the root directory of the sandbox (one level up to the directory uncompressed from + // the Spark distribution tarball), so "cd spark-1*" brings us to the correct executor side + // Spark home. val basename = uri.split('/').last.split('.').head - command.setValue("cd %s*; ./sbin/spark-executor".format(basename)) + command.setValue(s"cd $basename*; ./sbin/mesos-pyenv.sh; " + + CommandUtils.buildCommandSeq(mesosCommand, sc.executorMemory, ".") + .mkString("\"", "\" \"", "\"")) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } val memory = Resource.newBuilder() diff --git a/sbin/spark-executor b/sbin/mesos-pyenv.sh similarity index 87% rename from sbin/spark-executor rename to sbin/mesos-pyenv.sh index 3621321a9bc8..8bb2296ad84f 100755 --- a/sbin/spark-executor +++ b/sbin/mesos-pyenv.sh @@ -17,10 +17,10 @@ # limitations under the License. # +# +# This script is only used for fine grained Mesos executors + FWDIR="$(cd `dirname $0`/..; pwd)" export PYTHONPATH=$FWDIR/python:$PYTHONPATH export PYTHONPATH=$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH - -echo "Running spark-executor with framework dir = $FWDIR" -exec $FWDIR/bin/spark-class org.apache.spark.executor.MesosExecutorBackend