Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
# - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append

# Options read by executors and drivers running inside the cluster
# - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node
# - SPARK_PUBLIC_DNS, to set the public DNS name of the driver program
# - SPARK_CLASSPATH, default classpath entries to append
# - SPARK_LOCAL_DIRS, storage directories to use on this node for shuffle and RDD data
# - MESOS_NATIVE_JAVA_LIBRARY, to point to your libmesos.so if you use Mesos

Expand All @@ -48,7 +46,6 @@
# - SPARK_WORKER_CORES, to set the number of cores to use on this machine
# - SPARK_WORKER_MEMORY, to set how much total memory workers have to give executors (e.g. 1000m, 2g)
# - SPARK_WORKER_PORT / SPARK_WORKER_WEBUI_PORT, to use non-default ports for the worker
# - SPARK_WORKER_INSTANCES, to set the number of worker processes per node
# - SPARK_WORKER_DIR, to set the working directory of worker processes
# - SPARK_WORKER_OPTS, to set config properties only for the worker (e.g. "-Dx=y")
# - SPARK_DAEMON_MEMORY, to allocate to the master, worker and history server themselves (default: 1g).
Expand Down
65 changes: 0 additions & 65 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -518,71 +518,6 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria
}
}

// Check for legacy configs
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
s"""
|SPARK_JAVA_OPTS was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
| - ./spark-submit with --driver-java-options to set -X options for a driver
| - spark.executor.extraJavaOptions to set -X options for executors
| - SPARK_DAEMON_JAVA_OPTS to set java options for standalone daemons (master or worker)
""".stripMargin
logWarning(warning)

for (key <- Seq(executorOptsKey, driverOptsKey)) {
if (getOption(key).isDefined) {
throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
} else {
logWarning(s"Setting '$key' to '$value' as a work-around.")
set(key, value)
}
}
}

sys.env.get("SPARK_CLASSPATH").foreach { value =>
val warning =
s"""
|SPARK_CLASSPATH was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with --driver-class-path to augment the driver classpath
| - spark.executor.extraClassPath to augment the executor classpath
""".stripMargin
logWarning(warning)

for (key <- Seq(executorClasspathKey, driverClassPathKey)) {
if (getOption(key).isDefined) {
throw new SparkException(s"Found both $key and SPARK_CLASSPATH. Use only the former.")
} else {
logWarning(s"Setting '$key' to '$value' as a work-around.")
set(key, value)
}
}
}

if (!contains(sparkExecutorInstances)) {
sys.env.get("SPARK_WORKER_INSTANCES").foreach { value =>
val warning =
s"""
|SPARK_WORKER_INSTANCES was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with --num-executors to specify the number of executors
| - Or set SPARK_EXECUTOR_INSTANCES
| - spark.executor.instances to configure the number of instances in the spark config.
""".stripMargin
logWarning(warning)

set("spark.executor.instances", value)
}
}

if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
"instead use \"yarn\" with specified deploy mode."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ import org.apache.spark.util.{ThreadUtils, Utils}
* Execute using
* ./bin/spark-class org.apache.spark.deploy.FaultToleranceTest
*
* Make sure that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS
* *and* SPARK_JAVA_OPTS:
* Make sure that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
* - spark.deploy.recoveryMode=ZOOKEEPER
* - spark.deploy.zookeeper.url=172.17.42.1:2181
* Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, comm
val cmd = buildJavaCommand(command.classPathEntries.mkString(File.pathSeparator))
cmd.add(s"-Xmx${memoryMb}M")
command.javaOpts.foreach(cmd.add)
addOptionString(cmd, getenv("SPARK_JAVA_OPTS"))
cmd
}

Expand Down
2 changes: 1 addition & 1 deletion docs/rdd-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,7 @@ If required, a Hadoop configuration can be passed in as a Python dict. Here is a
Elasticsearch ESInputFormat:

{% highlight python %}
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
$ ./bin/pyspark --jars /path/to/elasticsearch-hadoop.jar
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
"org.apache.hadoop.io.NullWritable",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ List<String> buildClassPath(String appClassPath) throws IOException {
String sparkHome = getSparkHome();

Set<String> cp = new LinkedHashSet<>();
addToClassPath(cp, getenv("SPARK_CLASSPATH"));
addToClassPath(cp, appClassPath);

addToClassPath(cp, getConfDir());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public List<String> buildCommand(Map<String, String> env)
memKey = "SPARK_DAEMON_MEMORY";
break;
case "org.apache.spark.executor.CoarseGrainedExecutorBackend":
javaOptsKeys.add("SPARK_JAVA_OPTS");
javaOptsKeys.add("SPARK_EXECUTOR_OPTS");
memKey = "SPARK_EXECUTOR_MEMORY";
break;
Expand All @@ -84,7 +83,6 @@ public List<String> buildCommand(Map<String, String> env)
memKey = "SPARK_DAEMON_MEMORY";
break;
default:
javaOptsKeys.add("SPARK_JAVA_OPTS");
memKey = "SPARK_DRIVER_MEMORY";
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ private List<String> buildSparkSubmitCommand(Map<String, String> env)
addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
}
addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));
addOptionString(cmd, System.getenv("SPARK_JAVA_OPTS"));

// We don't want the client to specify Xmx. These have to be set by their corresponding
// memory flag --driver-memory or configuration entry spark.driver.memory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
val environment = Environment.newBuilder()
val extraClassPath = conf.getOption("spark.executor.extraClassPath")
extraClassPath.foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "")

// Set the environment variable through a command prefix
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,6 @@ private[spark] class MesosFineGrainedSchedulerBackend(
throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!")
}
val environment = Environment.newBuilder()
sc.conf.getOption("spark.executor.extraClassPath").foreach { cp =>
environment.addVariables(
Environment.Variable.newBuilder().setName("SPARK_CLASSPATH").setValue(cp).build())
}
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions").getOrElse("")

val prefixEnv = sc.conf.getOption("spark.executor.extraLibraryPath").map { p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,14 +748,6 @@ private[spark] class Client(
.map { case (k, v) => (k.substring(amEnvPrefix.length), v) }
.foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) }

// Keep this for backwards compatibility but users should move to the config
sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
// Allow users to specify some environment variables.
YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
// Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments.
env("SPARK_YARN_USER_ENV") = userEnvs
}

// If pyFiles contains any .py files, we need to add LOCALIZED_PYTHON_DIR to the PYTHONPATH
// of the container processes too. Add all non-.py files directly to PYTHONPATH.
//
Expand All @@ -782,35 +774,7 @@ private[spark] class Client(
sparkConf.setExecutorEnv("PYTHONPATH", pythonPathStr)
}

// In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to
// executors. But we can't just set spark.executor.extraJavaOptions, because the driver's
// SparkContext will not let that set spark* system properties, which is expected behavior for
// Yarn clients. So propagate it through the environment.
//
// Note that to warn the user about the deprecation in cluster mode, some code from
// SparkConf#validateSettings() is duplicated here (to avoid triggering the condition
// described above).
if (isClusterMode) {
sys.env.get("SPARK_JAVA_OPTS").foreach { value =>
val warning =
s"""
|SPARK_JAVA_OPTS was detected (set to '$value').
|This is deprecated in Spark 1.0+.
|
|Please instead use:
| - ./spark-submit with conf/spark-defaults.conf to set defaults for an application
| - ./spark-submit with --driver-java-options to set -X options for a driver
| - spark.executor.extraJavaOptions to set -X options for executors
""".stripMargin
logWarning(warning)
for (proc <- Seq("driver", "executor")) {
val key = s"spark.$proc.extraJavaOptions"
if (sparkConf.contains(key)) {
throw new SparkException(s"Found both $key and SPARK_JAVA_OPTS. Use only the former.")
}
}
env("SPARK_JAVA_OPTS") = value
}
// propagate PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to driver in cluster mode
Seq("PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON").foreach { envname =>
if (!env.contains(envname)) {
Expand Down Expand Up @@ -883,8 +847,7 @@ private[spark] class Client(

// Include driver-specific java options if we are launching a driver
if (isClusterMode) {
val driverOpts = sparkConf.get(DRIVER_JAVA_OPTIONS).orElse(sys.env.get("SPARK_JAVA_OPTS"))
driverOpts.foreach { opts =>
sparkConf.get(DRIVER_JAVA_OPTIONS).foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
val libraryPaths = Seq(sparkConf.get(DRIVER_LIBRARY_PATH),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,6 @@ private[yarn] class ExecutorRunnable(
sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
sys.env.get("SPARK_JAVA_OPTS").foreach { opts =>
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
sparkConf.get(EXECUTOR_LIBRARY_PATH).foreach { p =>
prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))
}
Expand Down Expand Up @@ -229,11 +226,6 @@ private[yarn] class ExecutorRunnable(
YarnSparkHadoopUtil.addPathToEnvironment(env, key, value)
}

// Keep this for backwards compatibility but users should move to the config
sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs =>
YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs)
}

// lookup appropriate http scheme for container log urls
val yarnHttpPolicy = conf.get(
YarnConfiguration.YARN_HTTP_POLICY_KEY,
Expand Down