Skip to content
Closed
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* Set the location where Spark is installed on worker nodes.
*/
def setSparkHome(home: String): SparkConf = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe should mark this as deprecated too?

set("spark.home", home)
set("spark.home", home) // deprecated
set("spark.driver.home", home)
set("spark.executor.home", home)
}

/** Set multiple parameters together */
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ class SparkContext(config: SparkConf) extends Logging {
* or the spark.home Java property, or the SPARK_HOME environment variable
* (in that order of preference). If neither of these is set, return None.
*/
@deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0")
private[spark] def getSparkHome(): Option[String] = {
conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME")))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* or the spark.home Java property, or the SPARK_HOME environment variable
* (in that order of preference). If neither of these is set, return None.
*/
@deprecated("spark.home is deprecated; use spark.{driver/executor}.home instead", "1.1.0")
def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ private[spark] class ApplicationDescription(
val maxCores: Option[Int],
val memoryPerSlave: Int,
val command: Command,
val sparkHome: Option[String],
var appUiUrl: String,
val executorSparkHome: Option[String] = None,
val eventLogDir: Option[String] = None)
extends Serializable {

Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,16 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
val javaOpts = sys.props.get(javaOptionsConf)
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
// TODO: document this once standalone-cluster mode is fixed (SPARK-2260)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this get updated now?

val driverSparkHome = conf.getOption("spark.driver.home")

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
command,
driverSparkHome)

masterActor ! RequestSubmitDriver(driverDescription)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ private[spark] class DriverDescription(
val mem: Int,
val cores: Int,
val supervise: Boolean,
val command: Command)
val command: Command,
val driverSparkHome: Option[String] = None)
extends Serializable {

override def toString: String = s"DriverDescription (${command.mainClass})"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] object JsonProtocol {
("cores" -> obj.maxCores) ~
("memoryperslave" -> obj.memoryPerSlave) ~
("user" -> obj.user) ~
("sparkhome" -> obj.sparkHome) ~
("executorsparkhome" -> obj.executorSparkHome) ~
("command" -> obj.command.toString)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ private[spark] object TestClient {
val conf = new SparkConf
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
Seq()), Some("dummy-spark-home"), "ignored")
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq()), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.deploy.master.DriverState.DriverState

/**
* Manages the execution of one driver, including automatically restarting the driver on failure.
* This is currently only used by the standalone Worker in cluster deploy mode.
*/
private[spark] class DriverRunner(
val driverId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.util.logging.FileAppender

/**
* Manages the execution of one executor process.
* This is currently used only by the standalone Worker.
*/
private[spark] class ExecutorRunner(
val appId: String,
Expand Down
25 changes: 13 additions & 12 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ private[spark] class Worker(
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
val workerSparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
Expand All @@ -106,12 +106,12 @@ private[spark] class Worker(
def memoryFree: Int = memory - memoryUsed

def createWorkDir() {
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(sparkHome, "work"))
workDir = Option(workDirPath).map(new File(_)).getOrElse(new File(workerSparkHome, "work"))
try {
// This sporadically fails - not sure why ... !workDir.exists() && !workDir.mkdirs()
// So attempting to create and then check if directory was created or not.
workDir.mkdirs()
if ( !workDir.exists() || !workDir.isDirectory) {
if (!workDir.exists() || !workDir.isDirectory) {
logError("Failed to create work directory " + workDir)
System.exit(1)
}
Expand All @@ -127,7 +127,7 @@ private[spark] class Worker(
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
logInfo("Spark home: " + sparkHome)
logInfo("Worker Spark home: " + workerSparkHome)
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
Expand Down Expand Up @@ -232,10 +232,9 @@ private[spark] class Worker(
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, conf, ExecutorState.RUNNING)
val execSparkHome = appDesc.executorSparkHome.map(new File(_)).getOrElse(workerSparkHome)
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self,
workerId, host, execSparkHome, workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
Expand Down Expand Up @@ -264,7 +263,7 @@ private[spark] class Worker(
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
executors.get(fullId) match {
case Some(executor) =>
case Some(executor) =>
logInfo("Executor " + fullId + " finished with state " + state +
message.map(" message " + _).getOrElse("") +
exitStatus.map(" exitStatus " + _).getOrElse(""))
Expand Down Expand Up @@ -295,7 +294,8 @@ private[spark] class Worker(

case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl)
val driverSparkHome = driverDesc.driverSparkHome.map(new File(_)).getOrElse(workerSparkHome)
val driver = new DriverRunner(driverId, workDir, driverSparkHome, driverDesc, self, akkaUrl)
drivers(driverId) = driver
driver.start()

Expand Down Expand Up @@ -381,7 +381,8 @@ private[spark] object Worker extends Logging {
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
workDir: String,
workerNumber: Option[Int] = None): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val conf = new SparkConf
Expand All @@ -391,7 +392,7 @@ private[spark] object Worker extends Logging {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf, securityManager = securityMgr)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
masterUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)
(actorSystem, boundPort)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
classPathEntries, libraryPathEntries, extraJavaOpts)
val sparkHome = sc.getSparkHome()
val executorSparkHome = conf.getOption("spark.executor.home")
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
sc.ui.appUIAddress, executorSparkHome, sc.eventLogger.map(_.logDir))

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ private[spark] class CoarseMesosSchedulerBackend(
val taskIdToSlaveId = new HashMap[Int, String]
val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed

val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
private val sparkHome = sc.conf.getOption("spark.executor.home")
.orElse(sc.conf.getOption("spark.home")) // deprecated
.getOrElse {
throw new SparkException("Executor Spark home is not set; set it through spark.executor.home")
}

val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ private[spark] class MesosSchedulerBackend(

var classLoader: ClassLoader = null

private val sparkHome = sc.conf.getOption("spark.executor.home")
.orElse(sc.conf.getOption("spark.home")) // deprecated
.getOrElse {
throw new SparkException("Executor Spark home is not set; set it through spark.executor.home")
}

override def start() {
synchronized {
classLoader = Thread.currentThread.getContextClassLoader
Expand All @@ -86,9 +92,6 @@ private[spark] class MesosSchedulerBackend(
}

def createExecutorInfo(execId: String): ExecutorInfo = {
val sparkHome = sc.getSparkHome().getOrElse(throw new SparkException(
"Spark home is not set; set it through the spark.home system " +
"property, the SPARK_HOME environment variable or the SparkContext constructor"))
val environment = Environment.newBuilder()
sc.executorEnvs.foreach { case (key, value) =>
environment.addVariables(Environment.Variable.newBuilder()
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/DriverSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import scala.language.postfixOps
class DriverSuite extends FunSuite with Timeouts {

test("driver should exit after finishing") {
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val sparkHome = sys.props("spark.test.home")
// Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
forAll(masters) { (master: String) =>
Expand Down
2 changes: 0 additions & 2 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {

conf.setMaster("local[3]")
conf.setAppName("My app")
conf.setSparkHome("/path")
conf.setJars(Seq("a.jar", "b.jar"))
conf.setExecutorEnv("VAR1", "value1")
conf.setExecutorEnv(Seq(("VAR2", "value2"), ("VAR3", "value3")))

assert(conf.get("spark.master") === "local[3]")
assert(conf.get("spark.app.name") === "My app")
assert(conf.get("spark.home") === "/path")
assert(conf.get("spark.jars") === "a.jar,b.jar")
assert(conf.get("spark.executorEnv.VAR1") === "value1")
assert(conf.get("spark.executorEnv.VAR2") === "value2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class JsonProtocolSuite extends FunSuite {

def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
new ApplicationDescription("name", Some(4), 1234, cmd, "appUiUrl", Some("executorSparkHome"))
}

def createAppInfo() : ApplicationInfo = {
Expand Down Expand Up @@ -169,7 +169,7 @@ object JsonConstants {
val appDescJsonStr =
"""
|{"name":"name","cores":4,"memoryperslave":1234,
|"user":"%s","sparkhome":"sparkHome",
|"user":"%s","executorsparkhome":"executorSparkHome",
|"command":"Command(mainClass,List(arg1, arg2),Map(),List(),List(),None)"}
""".format(System.getProperty("user.name", "<unknown>")).stripMargin

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ class SparkSubmitSuite extends FunSuite with Matchers {

// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String]): String = {
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
val sparkHome = sys.props("spark.test.home")
Utils.executeAndGetOutput(
Seq("./bin/spark-submit") ++ args,
new File(sparkHome),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,11 @@ import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val sparkHome = sys.props("spark.test.home")
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq()),
sparkHome, "appUiUrl")
Command("foo", Seq(), Map(), Seq(), Seq()), "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)

assert(er.getCommandSeq.last === appId)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,15 @@ Apart from these, the following properties are also available, and may be useful
Set a special library path to use when launching executor JVM's.
</td>
</tr>
<tr>
<td><code>spark.executor.home</code></td>
<td>(none)</td>
<td>
Home directory of Spark installation to use when launching executors on the worker machines.
In standalone mode, the Worker's current working directory is used if this is not set. This
is not used in yarn mode.
</td>
</tr>
<tr>
<td><code>spark.files.userClassPathFirst</code></td>
<td>false</td>
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,8 @@ object TestSettings {
lazy val settings = Seq (
// Fork new JVMs for tests and set Java options for those
fork := true,
javaOptions in Test += "-Dspark.home=" + sparkHome,
javaOptions in Test += "-Dspark.test.home=" + sparkHome,
javaOptions in Test += "-Dspark.executor.home=" + sparkHome, // For local-cluster mode
javaOptions in Test += "-Dspark.testing=1",
javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
Expand Down
10 changes: 2 additions & 8 deletions python/pyspark/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,9 @@
u'local'
>>> sc.appName
u'My app'
>>> sc.sparkHome is None
>>> sc.sparkHome is not None
True

>>> conf = SparkConf(loadDefaults=False)
>>> conf.setSparkHome("/path")
<pyspark.conf.SparkConf object at ...>
>>> conf.get("spark.home")
u'/path'
>>> conf.setExecutorEnv("VAR1", "value1")
<pyspark.conf.SparkConf object at ...>
>>> conf.setExecutorEnv(pairs = [("VAR3", "value3"), ("VAR4", "value4")])
Expand All @@ -48,9 +43,8 @@
spark.executorEnv.VAR1=value1
spark.executorEnv.VAR3=value3
spark.executorEnv.VAR4=value4
spark.home=/path
>>> sorted(conf.getAll(), key=lambda p: p[0])
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4'), (u'spark.home', u'/path')]
[(u'spark.executorEnv.VAR1', u'value1'), (u'spark.executorEnv.VAR3', u'value3'), (u'spark.executorEnv.VAR4', u'value4')]
"""


Expand Down
6 changes: 2 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
@param serializer: The serializer for RDDs.
@param conf: A L{SparkConf} object setting Spark properties.
@param gateway: Use an existing gateway and JVM, otherwise a new JVM
will be instatiated.
will be instantiated.


>>> from pyspark.context import SparkContext
Expand Down Expand Up @@ -108,8 +108,6 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
self._conf.setMaster(master)
if appName:
self._conf.setAppName(appName)
if sparkHome:
self._conf.setSparkHome(sparkHome)
if environment:
for key, value in environment.iteritems():
self._conf.setExecutorEnv(key, value)
Expand All @@ -124,7 +122,7 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
# the classpath or an external config file
self.master = self._conf.get("spark.master")
self.appName = self._conf.get("spark.app.name")
self.sparkHome = self._conf.get("spark.home", None)
self.sparkHome = os.environ.get("SPARK_HOME")
for (k, v) in self._conf.getAll():
if k.startswith("spark.executorEnv."):
varName = k[len("spark.executorEnv."):]
Expand Down
3 changes: 0 additions & 3 deletions repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -951,9 +951,6 @@ class SparkILoop(in0: Option[BufferedReader], protected val out: JPrintWriter,
if (execUri != null) {
conf.set("spark.executor.uri", execUri)
}
if (System.getenv("SPARK_HOME") != null) {
conf.setSparkHome(System.getenv("SPARK_HOME"))
}
sparkContext = new SparkContext(conf)
logInfo("Created spark context..")
sparkContext
Expand Down
Loading