-
Notifications
You must be signed in to change notification settings - Fork 29k
SPARK-2058: Overriding config from SPARK_HOME with SPARK_CONF_DIR #997
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
5ede2c0
96e3fdd
69d337e
99a4341
5357369
d2d1543
186c975
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -47,7 +47,7 @@ object SparkSubmit { | |
| private val PYSPARK_SHELL = "pyspark-shell" | ||
|
|
||
| def main(args: Array[String]) { | ||
| val appArgs = new SparkSubmitArguments(args) | ||
| val appArgs = new SparkSubmitArguments(args, sys.env) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to pass |
||
| if (appArgs.verbose) { | ||
| printStream.println(appArgs) | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,7 +30,7 @@ import org.apache.spark.util.Utils | |
| /** | ||
| * Parses and encapsulates arguments from the spark-submit script. | ||
| */ | ||
| private[spark] class SparkSubmitArguments(args: Seq[String]) { | ||
| private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No need to accept an extra argument here. This will always read from the JVM directly anyway.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, you use this for tests. Then it makes sense to add a comment here to explain that. |
||
| var master: String = null | ||
| var deployMode: String = null | ||
| var executorMemory: String = null | ||
|
|
@@ -83,9 +83,12 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | |
|
|
||
| // Use common defaults file, if not specified by user | ||
| if (propertiesFile == null) { | ||
| sys.env.get("SPARK_HOME").foreach { sparkHome => | ||
| val sep = File.separator | ||
| val defaultPath = s"${sparkHome}${sep}conf${sep}spark-defaults.conf" | ||
| val sep = File.separator | ||
| val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf") | ||
|
|
||
| // give preference to user defined conf over the one in spark home | ||
| env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig).foreach { configPath => | ||
| val defaultPath = s"${configPath}${sep}spark-defaults.conf" | ||
| val file = new File(defaultPath) | ||
| if (file.exists()) { | ||
| propertiesFile = file.getAbsolutePath | ||
|
|
@@ -161,7 +164,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | |
| } | ||
|
|
||
| if (master.startsWith("yarn")) { | ||
| val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR") | ||
| val hasHadoopEnv = env.contains("HADOOP_CONF_DIR") || env.contains("YARN_CONF_DIR") | ||
| if (!hasHadoopEnv && !Utils.isTesting) { | ||
| throw new Exception(s"When running with master '$master' " + | ||
| "either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.") | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.deploy | ||
|
|
||
| import java.io.{File, OutputStream, PrintStream} | ||
| import java.io._ | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
|
|
@@ -264,6 +264,21 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { | |
| runSparkSubmit(args) | ||
| } | ||
|
|
||
| test("SPARK_CONF_DIR overrides spark-defaults.conf") { | ||
| forConfDir(Map("spark.executor.memory" -> "2.3g")) { path => | ||
| val unusedJar = TestUtils.createJarWithClasses(Seq.empty) | ||
| val args = Seq( | ||
| "--class", SimpleApplicationTest.getClass.getName.stripSuffix("$"), | ||
| "--name", "testApp", | ||
| "--master", "local", | ||
| unusedJar.toString) | ||
| val appArgs = new SparkSubmitArguments(args, Map("SPARK_CONF_DIR" -> path)) | ||
| assert(appArgs.propertiesFile != null) | ||
| assert(appArgs.propertiesFile.startsWith(path)) | ||
| appArgs.executorMemory should be ("2.3g") | ||
| } | ||
| } | ||
|
|
||
| // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. | ||
| def runSparkSubmit(args: Seq[String]): String = { | ||
| val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get | ||
|
|
@@ -272,6 +287,24 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { | |
| new File(sparkHome), | ||
| Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) | ||
| } | ||
|
|
||
| def forConfDir(defaults: Map[String, String]) (f: String => Unit) = { | ||
| val tmpDir = new File("TMP_SPARK_CONF_DIR") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use Files.createTempDir(), and delete it when the test is done (using scalatest's BeforeAndAfter); there are several examples of this in other tests.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a clarification: I see you're deleting the directory below, so BeforeAndAfter does not apply here. But still I'd prefer to use Files.createTempDir() instead of a hardcoded name. Just for paranoia. |
||
| tmpDir.mkdir() | ||
|
|
||
| val defaultsConf = new File(tmpDir.getAbsolutePath, "spark-defaults.conf") | ||
| val writer = new OutputStreamWriter(new FileOutputStream(defaultsConf)) | ||
| for ((key, value) <- defaults) writer.write(key + " " + value) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't you need a new line at the end there? |
||
| writer.flush() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sort of redundant with close() on the next line. |
||
| writer.close() | ||
|
|
||
| try { | ||
| f(tmpDir.getAbsolutePath) | ||
| } finally { | ||
| defaultsConf.delete() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While we're here, this can be one call to |
||
| tmpDir.delete() | ||
| } | ||
| } | ||
| } | ||
|
|
||
| object JarCreationTest { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the configuration dir need to be added to the classpath? The code, at least the part you're modifying below, doesn't seem to require that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed it's not used in the SparkSubmit, but this is done in order to provide the user defined config to the other components such as logging, Scheduler and metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also i think in standalone mode,worker need to use config. so the classpath is useful.