Skip to content
Closed
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{List => JList}
import java.util.Collections

Expand All @@ -28,9 +27,12 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.{SparkConf, Logging, SparkContext, SparkException}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.worker.CommandUtils
import org.apache.spark.util.Utils

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -111,48 +113,43 @@ private[spark] class CoarseMesosSchedulerBackend(

def createCommand(offer: Offer, numCores: Int): CommandInfo = {
val environment = Environment.newBuilder()
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
extraClassPath.foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
val mesosCommand = CommandInfo.newBuilder()
.setEnvironment(environment)

val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores.toString)
val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)

val libraryPathOption = "spark.executor.extraLibraryPath"
val extraLibraryPath = conf.getOption(libraryPathOption).map(p => s"-Djava.library.path=$p")
val extraOpts = Seq(extraJavaOpts, extraLibraryPath).flatten.mkString(" ")
// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts

sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
.setName(key)
.setValue(value)
.build())
val classPathEntries = conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val command = CommandInfo.newBuilder()
.setEnvironment(environment)
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val libraryPathEntries =
conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}

val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only passing sc.executorEnvs here to the command object here is not enough. Because we only use command with CommandUtils.buildCommandSeq below to generate the command line string, and in this case, command.environment is only used to run bin/compute-classpath (see here), not propagated to the target executor process.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, here i should set env to CommandInfo to propagate to the target executor process.

classPathEntries, libraryPathEntries, javaOpts)

val uri = conf.get("spark.executor.uri", null)
if (uri == null) {
val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d".format(
runScript, extraOpts, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
if ( uri == null ) {
mesosCommand.setValue(CommandUtils.buildCommandSeq(command, sc.executorMemory,
sparkHome).mkString("\"", "\" \"", "\""))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %s %d")
.format(basename, extraOpts, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
mesosCommand.setValue(CommandUtils.buildCommandSeq(command, sc.executorMemory,
basename).mkString("\"", "\" \"", "\""))
mesosCommand.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()
mesosCommand.build()
}

override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections

Expand All @@ -29,9 +28,12 @@ import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}

import org.apache.spark.{Logging, SparkContext, SparkException, TaskState}
import org.apache.spark.{SparkConf, Logging, SparkContext, TaskState, SparkException}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SchedulerBackend, SlaveLost, TaskDescription, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.Utils
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.worker.CommandUtils
import org.apache.spark.util.Utils

/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
Expand All @@ -56,6 +58,7 @@ private[spark] class MesosSchedulerBackend(
// Which slave IDs we have executors on
val slaveIdsWithExecutors = new HashSet[String]
val taskIdToSlaveId = new HashMap[Long, String]
val conf = sc.conf

// An ExecutorInfo for our tasks
var execArgs: Array[Byte] = null
Expand Down Expand Up @@ -96,26 +99,47 @@ private[spark] class MesosSchedulerBackend(
.setValue(value)
.build())
}
val command = CommandInfo.newBuilder()
val mesosCommand = CommandInfo.newBuilder()
.setEnvironment(environment)
val uri = sc.conf.get("spark.executor.uri", null)
if (uri == null) {
command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath)

val extraJavaOpts = conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)

// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts

val classPathEntries = conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathEntries =
conf.getOption("spark.executor.extraLibraryPath").toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}

val command = Command(
"org.apache.spark.executor.MesosExecutorBackend", Nil, sc.executorEnvs,
classPathEntries, libraryPathEntries, javaOpts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PYTHONPATH set by sbin/spark-executor is ignored here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should set PYTHONPATH as what sbin/spark-executor does somehow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am wondering it will be ok if set PYTHONPATH to environment as flowing
1 add PYTHONPATH to system env in driver process
2

    val environment = Environment.newBuilder()
    sc.executorEnvs.foreach { case (key, value) =>
      environment.addVariables(Environment.Variable.newBuilder()
        .setName(key)
        .setValue(value)
        .build())
    }
    environment.addVariables(Environment.Variable.newBuilder()
      .setName("PYTHONPATH")
      .setValue(sys.env.getOrElse("PYTHONPATH", ""))
      .build())

in this way maybe we can not get the PYTHONPATH by system env in executor process~


val uri = conf.get("spark.executor.uri", null)
if ( uri == null ) {
mesosCommand.setValue(CommandUtils.buildCommandSeq(command, sc.executorMemory,
sparkHome).mkString("\"", "\" \"", "\""))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
mesosCommand.setValue(CommandUtils.buildCommandSeq(command, sc.executorMemory,
basename).mkString("\"", "\" \"", "\""))
mesosCommand.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}

val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setCommand(mesosCommand)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(memory)
.build()
Expand Down