@@ -24,76 +24,94 @@ import scala.collection.JavaConversions._
2424import org .apache .spark .util .{RedirectThread , Utils }
2525
2626/**
27- * Wrapper of `bin/spark-class` that prepares the launch environment of the child JVM properly.
27+ * Launch an application through Spark submit in client mode with the appropriate classpath,
28+ * library paths, java options and memory. These properties of the JVM must be set before the
29+ * driver JVM is launched. The sole purpose of this class is to avoid handling the complexity
30+ * of parsing the properties file for such relevant configs in BASH.
31+ *
32+ * Usage: org.apache.spark.deploy.SparkClassLauncher <application args>
2833 */
2934private [spark] object SparkClassLauncher {
3035
31- // TODO : This is currently only used for running Spark submit in client mode .
32- // The goal moving forward is to use this class for all use cases of `bin/spark-class` .
36+ // Note : This class depends on the behavior of `bin/spark-class` and `bin/spark-submit` .
37+ // Any changes made there must be reflected in this file .
3338
34- /**
35- * Launch a Spark class with the given class paths, library paths, java options and memory,
36- * taking into account special `spark.driver.*` properties needed to start the driver JVM.
37- */
3839 def main (args : Array [String ]): Unit = {
39- if (args.size < 7 ) {
40- System .err.println(
41- """
42- |Usage: org.apache.spark.deploy.SparkClassLauncher
43- |
44- | [properties file] - path to your Spark properties file
45- | [java runner] - command to launch the child JVM
46- | [java class paths] - class paths to pass to the child JVM
47- | [java library paths] - library paths to pass to the child JVM
48- | [java opts] - java options to pass to the child JVM
49- | [java memory] - memory used to launch the child JVM
50- | [main class] - main class to run in the child JVM
51- | <main args> - arguments passed to this main class
52- |
53- |Example:
54- | org.apache.spark.deploy.SparkClassLauncher.SparkClassLauncher
55- | conf/spark-defaults.conf java /classpath1:/classpath2 /librarypath1:/librarypath2
56- | "-XX:-UseParallelGC -Dsome=property" 5g org.apache.spark.deploy.SparkSubmit
57- | --master local --class org.apache.spark.examples.SparkPi 10
58- """ .stripMargin)
59- System .exit(1 )
60- }
61- val propertiesFile = args(0 )
62- val javaRunner = args(1 )
63- val clClassPaths = args(2 )
64- val clLibraryPaths = args(3 )
65- val clJavaOpts = Utils .splitCommandString(args(4 ))
66- val clJavaMemory = args(5 )
67- val mainClass = args(6 )
40+ val submitArgs = args
41+ val runner = sys.env(" RUNNER" )
42+ val classpath = sys.env(" CLASSPATH" )
43+ val javaOpts = sys.env(" JAVA_OPTS" )
44+ val defaultDriverMemory = sys.env(" OUR_JAVA_MEM" )
45+
46+ // Spark submit specific environment variables
47+ val deployMode = sys.env(" SPARK_SUBMIT_DEPLOY_MODE" )
48+ val propertiesFile = sys.env(" SPARK_SUBMIT_PROPERTIES_FILE" )
49+ val bootstrapDriver = sys.env(" SPARK_SUBMIT_BOOTSTRAP_DRIVER" )
50+ val submitDriverMemory = sys.env.get(" SPARK_SUBMIT_DRIVER_MEMORY" )
51+ val submitLibraryPath = sys.env.get(" SPARK_SUBMIT_LIBRARY_PATH" )
52+ val submitClasspath = sys.env.get(" SPARK_SUBMIT_CLASSPATH" )
53+ val submitJavaOpts = sys.env.get(" SPARK_SUBMIT_JAVA_OPTS" )
54+
55+ assume(runner != null , " RUNNER must be set" )
56+ assume(classpath != null , " CLASSPATH must be set" )
57+ assume(javaOpts != null , " JAVA_OPTS must be set" )
58+ assume(defaultDriverMemory != null , " OUR_JAVA_MEM must be set" )
59+ assume(deployMode == " client" , " SPARK_SUBMIT_DEPLOY_MODE must be \" client\" !" )
60+ assume(propertiesFile != null , " SPARK_SUBMIT_PROPERTIES_FILE must be set" )
61+ assume(bootstrapDriver != null , " SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set!" )
6862
69- // In client deploy mode, parse the properties file for certain `spark.driver.*` configs.
70- // These configs encode java options, class paths, and library paths needed to launch the JVM.
63+ // Parse the properties file for the equivalent spark.driver.* configs
7164 val properties = SparkSubmitArguments .getPropertiesFromFile(new File (propertiesFile)).toMap
72- val confDriverMemory = properties.get(" spark.driver.memory" )
73- val confClassPaths = properties.get(" spark.driver.extraClassPath " )
74- val confLibraryPaths = properties.get(" spark.driver.extraLibraryPath " )
75- val confJavaOpts = properties.get(" spark.driver.extraJavaOptions" )
65+ val confDriverMemory = properties.get(" spark.driver.memory" ).getOrElse(defaultDriverMemory)
66+ val confLibraryPath = properties.get(" spark.driver.extraLibraryPath " ).getOrElse( " " )
67+ val confClasspath = properties.get(" spark.driver.extraClassPath " ).getOrElse( " " )
68+ val confJavaOpts = properties.get(" spark.driver.extraJavaOptions" ).getOrElse( " " )
7669
77- // Merge relevant command line values with the config equivalents, if any
78- val javaMemory = confDriverMemory.getOrElse(clJavaMemory)
79- val pathSeparator = sys.props(" path.separator" )
80- val classPaths = clClassPaths + confClassPaths.map(pathSeparator + _).getOrElse(" " )
81- val libraryPaths = clLibraryPaths + confLibraryPaths.map(pathSeparator + _).getOrElse(" " )
82- val javaOpts = clJavaOpts ++ confJavaOpts.map(Utils .splitCommandString).getOrElse(Seq .empty)
83- val filteredJavaOpts = javaOpts.distinct.filterNot { opt =>
84- opt.startsWith(" -Djava.library.path" ) || opt.startsWith(" -Xms" ) || opt.startsWith(" -Xmx" )
85- }
70+ // Favor Spark submit arguments over the equivalent configs in the properties file.
71+ // Note that we do not actually use the Spark submit values for library path, classpath,
72+ // and java opts here, because we have already captured them in BASH.
73+ val newDriverMemory = submitDriverMemory.getOrElse(confDriverMemory)
74+ val newLibraryPath =
75+ if (submitLibraryPath.isDefined) {
76+ // SPARK_SUBMIT_LIBRARY_PATH is already captured in JAVA_OPTS
77+ " "
78+ } else {
79+ " -Djava.library.path=" + confLibraryPath
80+ }
81+ val newClasspath =
82+ if (submitClasspath.isDefined) {
83+ // SPARK_SUBMIT_CLASSPATH is already captured in CLASSPATH
84+ classpath
85+ } else {
86+ classpath + sys.props(" path.separator" ) + confClasspath
87+ }
88+ val newJavaOpts =
89+ if (submitJavaOpts.isDefined) {
90+ // SPARK_SUBMIT_JAVA_OPTS is already captured in JAVA_OPTS
91+ javaOpts
92+ } else {
93+ javaOpts + " " + confJavaOpts
94+ }
8695
8796 // Build up command
8897 val command : Seq [String ] =
89- Seq (javaRunner) ++
90- { if (classPaths.nonEmpty) Seq (" -cp" , classPaths) else Seq .empty } ++
91- { if (libraryPaths.nonEmpty) Seq (s " -Djava.library.path= $libraryPaths" ) else Seq .empty } ++
92- filteredJavaOpts ++
93- Seq (s " -Xms $javaMemory" , s " -Xmx $javaMemory" ) ++
94- Seq (mainClass) ++
95- args.slice(7 , args.size)
96- val builder = new ProcessBuilder (command)
98+ Seq (runner) ++
99+ Seq (" -cp" , newClasspath) ++
100+ Seq (newLibraryPath) ++
101+ Utils .splitCommandString(newJavaOpts) ++
102+ Seq (s " -Xms $newDriverMemory" , s " -Xmx $newDriverMemory" ) ++
103+ Seq (" org.apache.spark.deploy.SparkSubmit" ) ++
104+ submitArgs
105+
106+ // Print the launch command. This follows closely the format used in `bin/spark-class`.
107+ if (sys.env.contains(" SPARK_PRINT_LAUNCH_COMMAND" )) {
108+ System .err.print(" Spark Command: " )
109+ System .err.println(command.mkString(" " ))
110+ System .err.println(" ========================================\n " )
111+ }
112+
113+ val filteredCommand = command.filter(_.nonEmpty)
114+ val builder = new ProcessBuilder (filteredCommand)
97115 val process = builder.start()
98116 new RedirectThread (System .in, process.getOutputStream, " redirect stdin" ).start()
99117 new RedirectThread (process.getInputStream, System .out, " redirect stdout" ).start()
0 commit comments