Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,21 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{List => JList}
import java.util.Collections
import java.util.{Collections, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.mesos.{Scheduler => MScheduler, _}

import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.worker.CommandUtils
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This set of utilities is not intended to be used outside of the deploy code (i.e. Spark's standalone scheduler) that's why it's causing issues here.

import org.apache.spark.executor.CoarseGrainedExecutorBackend
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -111,16 +113,16 @@ private[spark] class CoarseMesosSchedulerBackend(

def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val environment = Environment.newBuilder()
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
extraClassPath.foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
val classPathEntries = conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(File.pathSeparator)
}
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)

val libraryPathOption = "spark.executor.extraLibraryPath"
val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
val libraryPathEntries =
conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { lp =>
lp.split(File.pathSeparator)
}

sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
Expand All @@ -136,21 +138,21 @@ private[spark] class CoarseMesosSchedulerBackend(
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)

val mesosCommand = Command(
classOf[CoarseGrainedExecutorBackend].getCanonicalName,
Seq(driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores.toString),
sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts)

val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format(
runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
command.setValue(CommandUtils.buildCommandSeq(
mesosCommand, sc.executorMemory, sparkHome).mkString("\"", "\" \"", "\""))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d")
.format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores))
command.setValue(s"cd $basename*; " + CommandUtils.buildCommandSeq(
mesosCommand, sc.executorMemory, sparkHome).mkString("\"", "\" \"", "\""))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,21 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import java.util.{Collections, ArrayList => JArrayList, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.{Scheduler => MScheduler, _}

import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.worker.CommandUtils
import org.apache.spark.executor.MesosExecutorBackend
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}

/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
Expand Down Expand Up @@ -90,22 +91,39 @@ private[spark] class MesosSchedulerBackend(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val environment = Environment.newBuilder()
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(File.pathSeparator)
}
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)

val libraryPathEntries =
sc.conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { lp =>
lp.split(File.pathSeparator)
}
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
.build())
}
val mesosCommand = Command(
classOf[MesosExecutorBackend].getCanonicalName,
Seq.empty, sc.executorEnvs, classPathEntries, libraryPathEntries, extraJavaOpts)
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
if (uri == null) {
command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)
val pyEnv = new File(sparkHome, "sbin/mesos-pyenv.sh").getCanonicalPath
command.setValue(s"$pyEnv; " + CommandUtils.buildCommandSeq(
mesosCommand, sc.executorMemory, sparkHome).mkString("\"", "\" \"", "\""))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.setValue(s"cd $basename*; ./sbin/mesos-pyenv.sh; " +
CommandUtils.buildCommandSeq(mesosCommand, sc.executorMemory, sparkHome)
.mkString("\"", "\" \"", "\""))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val memory = Resource.newBuilder()
Expand Down
6 changes: 3 additions & 3 deletions sbin/spark-executor → sbin/mesos-pyenv.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
# limitations under the License.
#

#
# This script is only used for fine grained Mesos executors

FWDIR="$(cd `dirname $0`/..; pwd)"

export PYTHONPATH=$FWDIR/python:$PYTHONPATH
export PYTHONPATH=$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we don't even need this file if we're doing this. We can just export the PYTHONPATH inside the mesos backend classes themselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to, but still left this file here because it seemed non-trivial to figure out $FWDIR from Scala code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use sparkHome... we need it anyways to find this script.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But sparkHome is driver side Spark home directory, which can't be used on driver side to assemble the Mesos executor side command line. On the other side, sbin/mesos-pyenv.sh is always executed on Mesos executor side, thus $FWDIR always points to the right position.


echo "Running spark-executor with framework dir = $FWDIR"
exec $FWDIR/bin/spark-class org.apache.spark.executor.MesosExecutorBackend