Skip to content

Commit 293a0b5

Browse files
witgoandrewor14
authored andcommitted
[SPARK-2098] All Spark processes should support spark-defaults.conf, config file
This is another implementation about apache#1256 cc andrewor14 vanzin Author: GuoQiang Li <[email protected]> Closes apache#2379 from witgo/SPARK-2098-new and squashes the following commits: 4ef1cbd [GuoQiang Li] review commit 49ef70e [GuoQiang Li] Refactor getDefaultPropertiesFile c45d20c [GuoQiang Li] All Spark processes should support spark-defaults.conf, config file
1 parent 18ab6bd commit 293a0b5

File tree

8 files changed

+124
-50
lines changed

8 files changed

+124
-50
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@
1717

1818
package org.apache.spark.deploy
1919

20-
import java.io.{File, FileInputStream, IOException}
21-
import java.util.Properties
2220
import java.util.jar.JarFile
2321

2422
import scala.collection.JavaConversions._
2523
import scala.collection.mutable.{ArrayBuffer, HashMap}
2624

27-
import org.apache.spark.SparkException
2825
import org.apache.spark.util.Utils
2926

3027
/**
@@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
6360
val defaultProperties = new HashMap[String, String]()
6461
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
6562
Option(propertiesFile).foreach { filename =>
66-
val file = new File(filename)
67-
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
68-
if (k.startsWith("spark")) {
63+
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
64+
if (k.startsWith("spark.")) {
6965
defaultProperties(k) = v
7066
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
7167
} else {
@@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
9086
*/
9187
private def mergeSparkProperties(): Unit = {
9288
// Use common defaults file, if not specified by user
93-
if (propertiesFile == null) {
94-
val sep = File.separator
95-
val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
96-
val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)
97-
98-
confDir.foreach { sparkConfDir =>
99-
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
100-
val file = new File(defaultPath)
101-
if (file.exists()) {
102-
propertiesFile = file.getAbsolutePath
103-
}
104-
}
105-
}
89+
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile(env))
10690

10791
val properties = HashMap[String, String]()
10892
properties.putAll(defaultSparkProperties)
@@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
397381
SparkSubmit.exitFn()
398382
}
399383
}
400-
401-
object SparkSubmitArguments {
402-
/** Load properties present in the given file. */
403-
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
404-
require(file.exists(), s"Properties file $file does not exist")
405-
require(file.isFile(), s"Properties file $file is not a normal file")
406-
val inputStream = new FileInputStream(file)
407-
try {
408-
val properties = new Properties()
409-
properties.load(inputStream)
410-
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
411-
} catch {
412-
case e: IOException =>
413-
val message = s"Failed when loading Spark properties file $file"
414-
throw new SparkException(message, e)
415-
} finally {
416-
inputStream.close()
417-
}
418-
}
419-
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
6868
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")
6969

7070
// Parse the properties file for the equivalent spark.driver.* configs
71-
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
71+
val properties = Utils.getPropertiesFromFile(propertiesFile)
7272
val confDriverMemory = properties.get("spark.driver.memory")
7373
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
7474
val confClasspath = properties.get("spark.driver.extraClassPath")

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package org.apache.spark.deploy.history
1919

2020
import org.apache.spark.SparkConf
21+
import org.apache.spark.util.Utils
2122

2223
/**
2324
* Command-line parser for the master.
2425
*/
2526
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
2627
private var logDir: String = null
28+
private var propertiesFile: String = null
2729

2830
parse(args.toList)
2931

@@ -32,22 +34,34 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
3234
case ("--dir" | "-d") :: value :: tail =>
3335
logDir = value
3436
conf.set("spark.history.fs.logDirectory", value)
37+
System.setProperty("spark.history.fs.logDirectory", value)
3538
parse(tail)
3639

3740
case ("--help" | "-h") :: tail =>
3841
printUsageAndExit(0)
3942

43+
case ("--properties-file") :: value :: tail =>
44+
propertiesFile = value
45+
parse(tail)
46+
4047
case Nil =>
4148

4249
case _ =>
4350
printUsageAndExit(1)
4451
}
4552
}
4653

54+
// This mutates the SparkConf, so all accesses to it must be made after this line
55+
Utils.loadDefaultSparkProperties(conf, propertiesFile)
56+
4757
private def printUsageAndExit(exitCode: Int) {
4858
System.err.println(
4959
"""
50-
|Usage: HistoryServer
60+
|Usage: HistoryServer [options]
61+
|
62+
|Options:
63+
| --properties-file FILE Path to a custom Spark properties file.
64+
| Default is conf/spark-defaults.conf.
5165
|
5266
|Configuration options can be set by setting the corresponding JVM system property.
5367
|History Server options are always available; additional options depend on the provider.

core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
2727
var host = Utils.localHostName()
2828
var port = 7077
2929
var webUiPort = 8080
30+
var propertiesFile: String = null
3031

3132
// Check for settings in environment variables
3233
if (System.getenv("SPARK_MASTER_HOST") != null) {
@@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
3839
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
3940
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
4041
}
42+
43+
parse(args.toList)
44+
45+
// This mutates the SparkConf, so all accesses to it must be made after this line
46+
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
47+
4148
if (conf.contains("spark.master.ui.port")) {
4249
webUiPort = conf.get("spark.master.ui.port").toInt
4350
}
4451

45-
parse(args.toList)
46-
4752
def parse(args: List[String]): Unit = args match {
4853
case ("--ip" | "-i") :: value :: tail =>
4954
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
@@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
6368
webUiPort = value
6469
parse(tail)
6570

66-
case ("--help" | "-h") :: tail =>
71+
case ("--properties-file") :: value :: tail =>
72+
propertiesFile = value
73+
parse(tail)
74+
75+
case ("--help") :: tail =>
6776
printUsageAndExit(0)
6877

6978
case Nil => {}
@@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
8392
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
8493
" -h HOST, --host HOST Hostname to listen on\n" +
8594
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
86-
" --webui-port PORT Port for web UI (default: 8080)")
95+
" --webui-port PORT Port for web UI (default: 8080)\n" +
96+
" --properties-file FILE Path to a custom Spark properties file.\n" +
97+
" Default is conf/spark-defaults.conf.")
8798
System.exit(exitCode)
8899
}
89100
}

core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
3333
var memory = inferDefaultMemory()
3434
var masters: Array[String] = null
3535
var workDir: String = null
36+
var propertiesFile: String = null
3637

3738
// Check for settings in environment variables
3839
if (System.getenv("SPARK_WORKER_PORT") != null) {
@@ -47,15 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
4748
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
4849
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
4950
}
50-
if (conf.contains("spark.worker.ui.port")) {
51-
webUiPort = conf.get("spark.worker.ui.port").toInt
52-
}
5351
if (System.getenv("SPARK_WORKER_DIR") != null) {
5452
workDir = System.getenv("SPARK_WORKER_DIR")
5553
}
5654

5755
parse(args.toList)
5856

57+
// This mutates the SparkConf, so all accesses to it must be made after this line
58+
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
59+
60+
if (conf.contains("spark.worker.ui.port")) {
61+
webUiPort = conf.get("spark.worker.ui.port").toInt
62+
}
63+
5964
checkWorkerMemory()
6065

6166
def parse(args: List[String]): Unit = args match {
@@ -89,7 +94,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
8994
webUiPort = value
9095
parse(tail)
9196

92-
case ("--help" | "-h") :: tail =>
97+
case ("--properties-file") :: value :: tail =>
98+
propertiesFile = value
99+
parse(tail)
100+
101+
case ("--help") :: tail =>
93102
printUsageAndExit(0)
94103

95104
case value :: tail =>
@@ -124,7 +133,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
124133
" -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" +
125134
" -h HOST, --host HOST Hostname to listen on\n" +
126135
" -p PORT, --port PORT Port to listen on (default: random)\n" +
127-
" --webui-port PORT Port for web UI (default: 8081)")
136+
" --webui-port PORT Port for web UI (default: 8081)\n" +
137+
" --properties-file FILE Path to a custom Spark properties file.\n" +
138+
" Default is conf/spark-defaults.conf.")
128139
System.exit(exitCode)
129140
}
130141

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1410,6 +1410,54 @@ private[spark] object Utils extends Logging {
14101410
}
14111411
}
14121412

1413+
/**
1414+
* Load default Spark properties from the given file. If no file is provided,
1415+
* use the common defaults file. This mutates state in the given SparkConf and
1416+
* in this JVM's system properties if the config specified in the file is not
1417+
* already set. Return the path of the properties file used.
1418+
*/
1419+
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
1420+
val path = Option(filePath).getOrElse(getDefaultPropertiesFile())
1421+
Option(path).foreach { confFile =>
1422+
getPropertiesFromFile(confFile).filter { case (k, v) =>
1423+
k.startsWith("spark.")
1424+
}.foreach { case (k, v) =>
1425+
conf.setIfMissing(k, v)
1426+
sys.props.getOrElseUpdate(k, v)
1427+
}
1428+
}
1429+
path
1430+
}
1431+
1432+
/** Load properties present in the given file. */
1433+
def getPropertiesFromFile(filename: String): Map[String, String] = {
1434+
val file = new File(filename)
1435+
require(file.exists(), s"Properties file $file does not exist")
1436+
require(file.isFile(), s"Properties file $file is not a normal file")
1437+
1438+
val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
1439+
try {
1440+
val properties = new Properties()
1441+
properties.load(inReader)
1442+
properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
1443+
} catch {
1444+
case e: IOException =>
1445+
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
1446+
} finally {
1447+
inReader.close()
1448+
}
1449+
}
1450+
1451+
/** Return the path of the default Spark properties file. */
1452+
def getDefaultPropertiesFile(env: Map[String, String] = sys.env): String = {
1453+
env.get("SPARK_CONF_DIR")
1454+
.orElse(env.get("SPARK_HOME").map { t => s"$t${File.separator}conf" })
1455+
.map { t => new File(s"$t${File.separator}spark-defaults.conf")}
1456+
.filter(_.isFile)
1457+
.map(_.getAbsolutePath)
1458+
.orNull
1459+
}
1460+
14131461
/** Return a nice string representation of the exception, including the stack trace. */
14141462
def exceptionString(e: Exception): String = {
14151463
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ import com.google.common.base.Charsets
2727
import com.google.common.io.Files
2828
import org.scalatest.FunSuite
2929

30+
import org.apache.spark.SparkConf
31+
3032
class UtilsSuite extends FunSuite {
3133

3234
test("bytesToString") {
@@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite {
332334
assert(!tempFile2.exists())
333335
}
334336

337+
test("loading properties from file") {
338+
val outFile = File.createTempFile("test-load-spark-properties", "test")
339+
try {
340+
System.setProperty("spark.test.fileNameLoadB", "2")
341+
Files.write("spark.test.fileNameLoadA true\n" +
342+
"spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
343+
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
344+
properties
345+
.filter { case (k, v) => k.startsWith("spark.")}
346+
.foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)}
347+
val sparkConf = new SparkConf
348+
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
349+
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
350+
} finally {
351+
outFile.delete()
352+
}
353+
}
335354
}

docs/monitoring.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,13 @@ follows:
7777
one implementation, provided by Spark, which looks for application logs stored in the
7878
file system.</td>
7979
</tr>
80+
<tr>
81+
<td>spark.history.fs.logDirectory</td>
82+
<td>(none)</td>
83+
<td>
84+
Directory that contains application event logs to be loaded by the history server
85+
</td>
86+
</tr>
8087
<tr>
8188
<td>spark.history.fs.updateInterval</td>
8289
<td>10</td>

0 commit comments

Comments
 (0)