Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{List => JList}
import java.util.Collections
import java.util.{Collections, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler => MScheduler, _}

import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.worker.CommandUtils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set of utilities is not intended to be used outside of the deploy code (i.e. Spark's standalone scheduler) that's why it's causing issues here.

import org.apache.spark.executor.CoarseGrainedExecutorBackend
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -111,16 +113,16 @@ private[spark] class CoarseMesosSchedulerBackend(

def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val environment = Environment.newBuilder()
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
extraClassPath.foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
val classPathEntries = conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(File.pathSeparator)
}
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)

val libraryPathOption = "spark.executor.extraLibraryPath"
val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
val libraryPathEntries =
conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { lp =>
lp.split(File.pathSeparator)
}

sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
Expand All @@ -136,21 +138,28 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

val mesosCommand = Command(
classOf[CoarseGrainedExecutorBackend].getCanonicalName,
Seq(driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores.toString),
sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts)

val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format(
runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.setValue(CommandUtils.buildCommandSeq(
mesosCommand, sc.executorMemory, sparkHome).mkString("\"", "\" \"", "\""))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
// Grab everything to the first '.'. We'll use that and '*' to glob the directory "correctly".
// For example, let the URI be:
//
// hdfs://localhost:9000/tmp/mesos/spark-1.1.0-bin-hadoop2.tgz
//
// then "basename" is "spark-1". When the Mesos executor is started, the working directory is
// set to the root directory of the sandbox (one level up to the directory uncompressed from
// the Spark distribution tarball), so "cd spark-1*" brings us to the correct executor side
// Spark home.
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d")
.format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores))
command.setValue(s"cd $basename*; " + CommandUtils.buildCommandSeq(
mesosCommand, sc.executorMemory, ".").mkString("\"", "\" \"", "\""))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import java.util.{Collections, ArrayList => JArrayList, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler, _}

import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.worker.CommandUtils
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}

/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
Expand Down Expand Up @@ -85,27 +86,55 @@ private[spark] class MesosSchedulerBackend(
}
}

// TODO Extract common code from this method and CoarseMesosSchedulerBackend.createCommand
def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val environment = Environment.newBuilder()
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(File.pathSeparator)
}
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)

val libraryPathEntries =
sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { lp =>
lp.split(File.pathSeparator)
}
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
.build())
}
val mesosCommand = Command(
classOf[MesosExecutorBackend].getCanonicalName,
Seq.empty, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts)
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
// We have to use "sbin/mesos-pyenv.sh" to setup the PYTHONPATH environment variable instead of
// setting it from the Scala code here. Because there's no way to figure out Mesos executor side
// Spark home in advance on driver side.
val uri = sc.conf.get("spark.executor.uri", null)
if (uri == null) {
command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
val pyEnv = new File(sparkHome, "sbin/mesos-pyenv.sh").getCanonicalPath
command.setValue(s"$pyEnv; " + CommandUtils.buildCommandSeq(
mesosCommand, sc.executorMemory, sparkHome).mkString("\"", "\" \"", "\""))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
// Grab everything to the first '.'. We'll use that and '*' to glob the directory "correctly".
// For example, let the URI be:
//
// hdfs://localhost:9000/tmp/mesos/spark-1.1.0-bin-hadoop2.tgz
//
// then "basename" is "spark-1". When the Mesos executor is started, the working directory is
// set to the root directory of the sandbox (one level up to the directory uncompressed from
// the Spark distribution tarball), so "cd spark-1*" brings us to the correct executor side
// Spark home.
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.setValue(s"cd $basename*; ./sbin/mesos-pyenv.sh; " +
CommandUtils.buildCommandSeq(mesosCommand, sc.executorMemory, ".")
.mkString("\"", "\" \"", "\""))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val memory = Resource.newBuilder()
Expand Down
6 changes: 3 additions & 3 deletions sbin/spark-executor → sbin/mesos-pyenv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
# limitations under the License.
#

#
# This script is only used for fine grained Mesos executors

FWDIR="$(cd `dirname $0`/..; pwd)"

export PYTHONPATH=$FWDIR/python:$PYTHONPATH
export PYTHONPATH=$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't even need this file if we're doing this. We can just export the PYTHONPATH inside the mesos backend classes themselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to, but still left this file here because it seemed non-trivial to figure out $FWDIR from Scala code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use sparkHome... we need it anyways to find this script.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But sparkHome is driver side Spark home directory, which can't be used on driver side to assemble the Mesos executor side command line. On the other side, sbin/mesos-pyenv.sh is always executed on Mesos executor side, thus $FWDIR always points to the right position.


echo "Running spark-executor with framework dir = $FWDIR"
exec $FWDIR/bin/spark-class org.apache.spark.executor.MesosExecutorBackend