Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 4 additions & 14 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,11 @@ import org.apache.ivy.plugins.matcher.GlobPatternMatcher
import org.apache.ivy.plugins.repository.file.FileRepository
import org.apache.ivy.plugins.resolver.{ChainResolver, FileSystemResolver, IBiblioResolver}

import org.apache.spark.{SPARK_REVISION, SPARK_VERSION, SparkException, SparkUserAppException}
import org.apache.spark.{SPARK_BRANCH, SPARK_BUILD_DATE, SPARK_BUILD_USER, SPARK_REPO_URL}
import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
import org.apache.spark.util._


/**
Expand All @@ -63,7 +62,7 @@ private[deploy] object SparkSubmitAction extends Enumeration {
* This program handles setting up the classpath with relevant Spark dependencies and provides
* a layer over the different cluster managers and deploy modes that Spark supports.
*/
object SparkSubmit {
object SparkSubmit extends CommandLineUtils {

// Cluster managers
private val YARN = 1
Expand All @@ -87,15 +86,6 @@ object SparkSubmit {
private val CLASS_NOT_FOUND_EXIT_STATUS = 101

// scalastyle:off println
// Exposed for testing
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
private[spark] var printStream: PrintStream = System.err
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)
private[spark] def printErrorAndExit(str: String): Unit = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}
private[spark] def printVersionAndExit(): Unit = {
printStream.println("""Welcome to
____ __
Expand All @@ -113,7 +103,7 @@ object SparkSubmit {
}
// scalastyle:on println

def main(args: Array[String]): Unit = {
override def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,10 +412,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
repositories = value

case CONF =>
value.split("=", 2).toSeq match {
case Seq(k, v) => sparkProperties(k) = v
case _ => SparkSubmit.printErrorAndExit(s"Spark config without '=': $value")
}
val (confName, confValue) = SparkSubmit.parseSparkConfProperty(value)
sparkProperties(confName) = confValue

case PROXY_USER =>
proxyUser = value
Expand Down
55 changes: 55 additions & 0 deletions core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util

import java.io.PrintStream

import org.apache.spark.SparkException

/*
* It contains basic command line parsing functionality and
* methods to parse some common Spark CLI options.
*
*/
private[spark] trait CommandLineUtils {

// Exposed for testing
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)

private[spark] var printStream: PrintStream = System.err

// scalastyle:off println
private[spark] def printWarning(str: String): Unit = printStream.println("Warning: " + str)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove extra empty line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok.

Copy link
Contributor Author

@skonto skonto Nov 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the empty line it still shows one which is used to separate statements there.

private[spark] def printErrorAndExit(str: String): Unit = {
printStream.println("Error: " + str)
printStream.println("Run with --help for usage help or --verbose for debug output")
exitFn(1)
}

// scalastyle:on println
private[spark] def parseSparkConfProperty(pair: String): (String, String) = {
pair.split("=", 2).toSeq match {
case Seq(k, v) => (k, v)
case _ => printErrorAndExit(s"Spark config without '=': $pair")
throw new SparkException(s"Spark config without '=': $pair")
}
}

def main(args: Array[String]): Unit
}
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1962,6 +1962,20 @@ private[spark] object Utils extends Logging {
path
}

/**
* Updates Spark config with properties from a set of Properties.
* Provided properties have the highest priority.
*/
def updateSparkConfigFromProperties(
conf: SparkConf,
properties: Map[String, String]) : Unit = {
properties.filter { case (k, v) =>
k.startsWith("spark.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can move this to the foreach body.

On a side note, this whole method feels a little funny, but I have to see how it's used.

Copy link
Contributor Author

@skonto skonto Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just re-used the code found here.

}.foreach { case (k, v) =>
conf.set(k, v)
}
}

/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
Expand Down
43 changes: 26 additions & 17 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,11 @@ import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.util.{ResetSystemProperties, Utils}
import org.apache.spark.util.{CommandLineUtils, ResetSystemProperties, Utils}

// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
// of properties that needed to be cleared after tests.
class SparkSubmitSuite
extends SparkFunSuite
with Matchers
with BeforeAndAfterEach
with ResetSystemProperties
with Timeouts {

override def beforeEach() {
super.beforeEach()
System.setProperty("spark.testing", "true")
}
trait TestPrematureExit {
suite: SparkFunSuite =>

private val noOpOutputStream = new OutputStream {
def write(b: Int) = {}
Expand All @@ -65,16 +55,19 @@ class SparkSubmitSuite
}

/** Returns true if the script exits and the given search string is printed. */
private def testPrematureExit(input: Array[String], searchString: String) = {
private[spark] def testPrematureExit(
input: Array[String],
searchString: String,
mainObject: CommandLineUtils = SparkSubmit) : Unit = {
val printStream = new BufferPrintStream()
SparkSubmit.printStream = printStream
mainObject.printStream = printStream

@volatile var exitedCleanly = false
SparkSubmit.exitFn = (_) => exitedCleanly = true
mainObject.exitFn = (_) => exitedCleanly = true

val thread = new Thread {
override def run() = try {
SparkSubmit.main(input)
mainObject.main(input)
} catch {
// If exceptions occur after the "exit" has happened, fine to ignore them.
// These represent code paths not reachable during normal execution.
Expand All @@ -88,6 +81,22 @@ class SparkSubmitSuite
fail(s"Search string '$searchString' not found in $joined")
}
}
}

// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
// of properties that needed to be cleared after tests.
class SparkSubmitSuite
extends SparkFunSuite
with Matchers
with BeforeAndAfterEach
with ResetSystemProperties
with Timeouts
with TestPrematureExit {

override def beforeEach() {
super.beforeEach()
System.setProperty("spark.testing", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't need to do this. Otherwise it would mean a bug in ResetSystemProperties.

Copy link
Contributor Author

@skonto skonto Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not my code... It was there before... as it is now... have a look.
Let me know what should I do with it.

}

// scalastyle:off println
test("prints usage on empty input") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.deploy.mesos.ui.MesosClusterUI
import org.apache.spark.deploy.rest.mesos.MesosRestServer
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.cluster.mesos._
import org.apache.spark.util.{ShutdownHookManager, Utils}
import org.apache.spark.util.{CommandLineUtils, ShutdownHookManager, Utils}

/*
* A dispatcher that is responsible for managing and launching drivers, and is intended to be
Expand Down Expand Up @@ -91,8 +91,10 @@ private[mesos] class MesosClusterDispatcher(
}
}

private[mesos] object MesosClusterDispatcher extends Logging {
def main(args: Array[String]) {
private[mesos] object MesosClusterDispatcher
extends Logging
with CommandLineUtils {
override def main(args: Array[String]) {
Utils.initDaemon(log)
val conf = new SparkConf
val dispatcherArgs = new MesosClusterDispatcherArguments(args, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,43 @@
package org.apache.spark.deploy.mesos

import scala.annotation.tailrec
import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.util.{IntParam, Utils}

import org.apache.spark.SparkConf

private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var name = "Spark Cluster"
var webUiPort = 8081
var host: String = Utils.localHostName()
var port: Int = 7077
var name: String = "Spark Cluster"
var webUiPort: Int = 8081
var verbose: Boolean = false
var masterUrl: String = _
var zookeeperUrl: Option[String] = None
var propertiesFile: String = _
val confProperties: mutable.HashMap[String, String] =
new mutable.HashMap[String, String]()

parse(args.toList)

// scalastyle:on println
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)
Utils.updateSparkConfigFromProperties(conf, confProperties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like it differs from Mesos to non-Mesos implementations. Here you can update props with --conf but wouldn't this be identical elsewhere? I may have missed the purpose.

Copy link
Contributor Author

@skonto skonto Sep 19, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic differs wrt to others (I would have to refactor everything otherwise which is not wise given code size for now). I just added support for the --conf option, that is the same across implementations, and re-used common code for parsing stuff.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I mean, why isn't this same code then called elsewhere? You know more about this than I, but I suppose I'd expect more parallel similarity between the Mesos and non-Mesos code, for supporting the same functionality. There's a method factored out here but not reused. Marcelo may understand this more anyway.

Copy link
Contributor Author

@skonto skonto Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-mesos code either does not support --conf or it does not need in all cases to set Spark confguration. Check here and here. If you compare the same functionality (the one about loading the properties file) with the MesosClusterDispatcherArguments file, things are completely different, even before my PR. Check here.

So I dont expect any parallel similarity since for example in this case ApplicationMaster does not need to get the spark configuration in a Spark Config.


// scalastyle:off println
if (verbose) {
MesosClusterDispatcher.printStream.println(s"Using host: $host")
MesosClusterDispatcher.printStream.println(s"Using port: $port")
MesosClusterDispatcher.printStream.println(s"Using webUiPort: $webUiPort")
MesosClusterDispatcher.printStream.println(s"Framework Name: $name")

Option(propertiesFile).foreach { file =>
MesosClusterDispatcher.printStream.println(s"Using properties file: $file")
}

MesosClusterDispatcher.printStream.println(s"Spark Config properties set:")
conf.getAll.foreach(println)
}

@tailrec
private def parse(args: List[String]): Unit = args match {
Expand All @@ -58,9 +78,10 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
case ("--master" | "-m") :: value :: tail =>
if (!value.startsWith("mesos://")) {
// scalastyle:off println
System.err.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
MesosClusterDispatcher.printStream
.println("Cluster dispatcher only supports mesos (uri begins with mesos://)")
// scalastyle:on println
System.exit(1)
MesosClusterDispatcher.exitFn(1)
}
masterUrl = value.stripPrefix("mesos://")
parse(tail)
Expand All @@ -73,37 +94,56 @@ private[mesos] class MesosClusterDispatcherArguments(args: Array[String], conf:
propertiesFile = value
parse(tail)

case ("--conf") :: value :: tail =>
val pair = MesosClusterDispatcher.
parseSparkConfProperty(value)
confProperties(pair._1) = pair._2
parse(tail)

case ("--help") :: tail =>
printUsageAndExit(0)
printUsageAndExit(0)

case ("--verbose") :: tail =>
verbose = true
parse(tail)

case Nil =>
if (masterUrl == null) {
if (Option(masterUrl).isEmpty) {
// scalastyle:off println
System.err.println("--master is required")
MesosClusterDispatcher.printStream.println("--master is required")
// scalastyle:on println
printUsageAndExit(1)
}

case _ =>
case value =>
// scalastyle:off println
MesosClusterDispatcher.printStream.println(s"Unrecognized option: '${value.head}'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does import MesosClusterDispatcher._ work to get these methods? Kinda noisy to see the class name repeated everywhere.

Copy link
Contributor Author

@skonto skonto Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is noisy too here as well. Counted 34 appearances of SparkSubmit. I just followed the same idea I thought since it was previously reviewed I should do the same, guessing it was used for reasons of clarity.

// scalastyle:on println
printUsageAndExit(1)
}

private def printUsageAndExit(exitCode: Int): Unit = {
val outStream = MesosClusterDispatcher.printStream

// scalastyle:off println
System.err.println(
outStream.println(
"Usage: MesosClusterDispatcher [options]\n" +
"\n" +
"Options:\n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" --help Show this help message and exit.\n" +
" --verbose, Print additional debug output.\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port WEBUI_PORT WebUI Port to listen on (default: 8081)\n" +
" --name NAME Framework name to show in Mesos UI\n" +
" -m --master MASTER URI for connecting to Mesos master\n" +
" -z --zk ZOOKEEPER Comma delimited URLs for connecting to \n" +
" Zookeeper for persistence\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
" Default is conf/spark-defaults.conf \n" +
" --conf PROP=VALUE Arbitrary Spark configuration property.\n" +
" Takes precedence over defined properties in properties-file.")
// scalastyle:on println
System.exit(exitCode)
MesosClusterDispatcher.exitFn(exitCode)
}
}
Loading