From 955a82c5a7b1fdee1d932bc52757598cca737823 Mon Sep 17 00:00:00 2001 From: chie8842 Date: Thu, 27 Oct 2016 03:54:15 +0900 Subject: [PATCH 01/11] added secure port configuration --- .../scala/org/apache/spark/SSLOptions.scala | 5 +++ .../org/apache/spark/ui/JettyUtils.scala | 45 +++++++++++++------ .../org/apache/spark/SSLOptionsSuite.scala | 2 + docs/configuration.md | 7 +++ 4 files changed, 46 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index be19179b00a49..efcec1134aec4 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -56,6 +56,7 @@ private[spark] case class SSLOptions( trustStorePassword: Option[String] = None, trustStoreType: Option[String] = None, protocol: Option[String] = None, + port: Int = 0, enabledAlgorithms: Set[String] = Set.empty) extends Logging { @@ -147,6 +148,7 @@ private[spark] object SSLOptions extends Logging { * $ - `[ns].trustStorePassword` - a password to the trust-store file * $ - `[ns].trustStoreType` - the type of trust-store * $ - `[ns].protocol` - a protocol name supported by a particular Java version + * $ - `[ns].port` - a port number * $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers * * For a list of protocols and ciphers supported by particular Java versions, you may go to @@ -191,6 +193,8 @@ private[spark] object SSLOptions extends Logging { val protocol = conf.getOption(s"$ns.protocol") .orElse(defaults.flatMap(_.protocol)) + val port = conf.getInt(s"$ns.port", defaultValue = defaults.map(_.port).getOrElse(0)) + val enabledAlgorithms = conf.getOption(s"$ns.enabledAlgorithms") .map(_.split(",").map(_.trim).filter(_.nonEmpty).toSet) .orElse(defaults.map(_.enabledAlgorithms)) @@ -207,6 +211,7 @@ private[spark] object SSLOptions extends Logging { trustStorePassword, trustStoreType, protocol, + port, enabledAlgorithms) } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 35c3c8d00f99b..d65e20f07f40a 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -308,21 +308,40 @@ private[spark] object JettyUtils extends Logging { sslOptions.createJettySslContextFactory().foreach { factory => // If the new port wraps around, do not try a privileged port. - val securePort = - if (currentPort != 0) { - (currentPort + 400 - 1024) % (65536 - 1024) + 1024 - } else { - 0 - } - val scheme = "https" - // Create a connector on port securePort to listen for HTTPS requests - val connector = new ServerConnector(server, factory) - connector.setPort(securePort) - connectors += connector + require(sslOptions.port == 0 || (1024 <= sslOptions.port && sslOptions.port < 65536)) + val maxRetries = Utils.portMaxRetries(conf) - // redirect the HTTP requests to HTTPS port - collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) + val scheme = "https" + // Create a connector on port securePort to listen for HTTPS requests + var offset = 0 + var connected = false + while (offset < maxRetries && connected == false) { + val securePort = + if (currentPort != 0) { + if (1024 < sslOptions.port && sslOptions.port < 65536) { + sslOptions.port + offset + } else { + (currentPort + 400 - 1024) % (65536 - 1024) + 1024 + } + } else { + 0 + } + val connector = new ServerConnector(server, factory) + connector.setPort(securePort) + connectors += connector + try{ + // redirect the HTTP requests to HTTPS port + collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) + connected = true + } catch { + case e: Exception => + if (offset >= maxRetries) { + throw e + } + } + offset += 1 + } } gzipHandlers.foreach(collection.addHandler) diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala index 159b448e05b02..aee242eade23c 100644 --- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala @@ -113,6 +113,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll { "TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA") conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF") conf.set("spark.ssl.protocol", "SSLv3") + conf.set("spark.ssl.port", "18999") val defaultOpts = SSLOptions.parse(conf, "spark.ssl", defaults = None) val opts = SSLOptions.parse(conf, "spark.ui.ssl", defaults = Some(defaultOpts)) @@ -128,6 +129,7 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll { assert(opts.keyStorePassword === Some("12345")) assert(opts.keyPassword === Some("password")) assert(opts.protocol === Some("SSLv3")) + assert(opts.port === 18999) assert(opts.enabledAlgorithms === Set("ABC", "DEF")) } diff --git a/docs/configuration.md b/docs/configuration.md index b07867d99aa9d..37faa54467382 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1663,6 +1663,13 @@ Apart from these, the following properties are also available, and may be useful page. + + spark.ssl.port + None + + A port number hen connecting with SSL. + + spark.ssl.needClientAuth false From 8ac0369cf292292bcbc26b957d975a50f94ee9af Mon Sep 17 00:00:00 2001 From: chie8842 Date: Thu, 27 Oct 2016 04:36:02 +0900 Subject: [PATCH 02/11] added configuration --- .../org/apache/spark/ui/JettyUtils.scala | 45 +++++++------------ docs/configuration.md | 8 ++-- 2 files changed, 22 insertions(+), 31 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index d65e20f07f40a..3dfd48e6a6130 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -310,38 +310,27 @@ private[spark] object JettyUtils extends Logging { // If the new port wraps around, do not try a privileged port. require(sslOptions.port == 0 || (1024 <= sslOptions.port && sslOptions.port < 65536)) - val maxRetries = Utils.portMaxRetries(conf) - val scheme = "https" - // Create a connector on port securePort to listen for HTTPS requests - var offset = 0 - var connected = false - while (offset < maxRetries && connected == false) { - val securePort = - if (currentPort != 0) { - if (1024 < sslOptions.port && sslOptions.port < 65536) { - sslOptions.port + offset - } else { - (currentPort + 400 - 1024) % (65536 - 1024) + 1024 - } + val securePort = + if (currentPort != 0) { + if (1024 < sslOptions.port && sslOptions.port < 65536) { + sslOptions.port } else { - 0 + (currentPort + 400 - 1024) % (65536 - 1024) + 1024 } - val connector = new ServerConnector(server, factory) - connector.setPort(securePort) - connectors += connector - try{ - // redirect the HTTP requests to HTTPS port - collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) - connected = true - } catch { - case e: Exception => - if (offset >= maxRetries) { - throw e - } + } else { + 0 } - offset += 1 - } + val scheme = "https" + // Create a connector on port securePort to listen for HTTPS requestswork. + + val connector = new ServerConnector(server, factory) + connector.setPort(securePort) + + connectors += connector + + // redirect the HTTP requests to HTTPS port + collection.addHandler(createRedirectHttpsHandler(securePort, scheme)) } gzipHandlers.foreach(collection.addHandler) diff --git a/docs/configuration.md b/docs/configuration.md index 37faa54467382..40b82f9d7c107 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1664,10 +1664,12 @@ Apart from these, the following properties are also available, and may be useful - spark.ssl.port - None + spark.ssl..port + 0 - A port number hen connecting with SSL. + A port number hen connecting with SSL. Default value 0 means to be determined automatically. + Attention that the port should be separated for each particular protocol. + Not configure as spark.ssl.port but configure spark.ssl..port. From 47108b4b2e3bb799e839a9687a7f50a374ae9cbc Mon Sep 17 00:00:00 2001 From: chie8842 Date: Thu, 27 Oct 2016 22:59:58 +0900 Subject: [PATCH 03/11] change config description and fix some typos. --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 2 +- docs/configuration.md | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 3dfd48e6a6130..f2eab39426507 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -322,7 +322,7 @@ private[spark] object JettyUtils extends Logging { 0 } val scheme = "https" - // Create a connector on port securePort to listen for HTTPS requestswork. + // Create a connector on port securePort to listen for HTTPS requests. val connector = new ServerConnector(server, factory) connector.setPort(securePort) diff --git a/docs/configuration.md b/docs/configuration.md index 40b82f9d7c107..d156d645d32cb 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1598,7 +1598,6 @@ Apart from these, the following properties are also available, and may be useful false

Whether to enable SSL connections on all supported protocols.

-

When spark.ssl.enabled is configured, spark.ssl.protocol is required.

@@ -1667,9 +1666,8 @@ Apart from these, the following properties are also available, and may be useful spark.ssl..port 0 - A port number hen connecting with SSL. Default value 0 means to be determined automatically. - Attention that the port should be separated for each particular protocol. - Not configure as spark.ssl.port but configure spark.ssl..port. + Port number to listen on for SSL connections. + Default value of 0 means the port will be determined automatically. From 396460926390c10b2348de26333a76029413955c Mon Sep 17 00:00:00 2001 From: chie8842 Date: Thu, 27 Oct 2016 23:04:09 +0900 Subject: [PATCH 04/11] Return the unintended change --- docs/configuration.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration.md b/docs/configuration.md index d156d645d32cb..d5a1b5ef63826 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1598,6 +1598,7 @@ Apart from these, the following properties are also available, and may be useful false

Whether to enable SSL connections on all supported protocols.

+

When spark.ssl.enabled is configured, spark.ssl.protocol is required.

From 1e2c98566b63fd2d2e2800378aae118ffc3afb73 Mon Sep 17 00:00:00 2001 From: chie8842 Date: Fri, 4 Nov 2016 22:34:53 +0900 Subject: [PATCH 05/11] deleted unnecessary brank spaces --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index d5a1b5ef63826..3dee83c62ca7a 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1598,7 +1598,7 @@ Apart from these, the following properties are also available, and may be useful false

Whether to enable SSL connections on all supported protocols.

- +

When spark.ssl.enabled is configured, spark.ssl.protocol is required.

From 935415c5408f8e710016aca0b869bfaed1d7782d Mon Sep 17 00:00:00 2001 From: chie8842 Date: Sat, 5 Nov 2016 16:51:27 +0900 Subject: [PATCH 06/11] fixed documentation about particular protocol --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 3dee83c62ca7a..c02f5df0f6d34 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1664,7 +1664,7 @@ Apart from these, the following properties are also available, and may be useful - spark.ssl..port + spark.ssl.port 0 Port number to listen on for SSL connections. From 62aeb6c0d1aabb57419956c4e9e8dc943e05b54c Mon Sep 17 00:00:00 2001 From: chie8842 Date: Sun, 6 Nov 2016 11:12:20 +0900 Subject: [PATCH 07/11] added comment about particular protocols --- docs/configuration.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/configuration.md b/docs/configuration.md index c02f5df0f6d34..fd8b00a4b5d64 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1669,6 +1669,7 @@ Apart from these, the following properties are also available, and may be useful Port number to listen on for SSL connections. Default value of 0 means the port will be determined automatically. + Attention that the port should be separated for each particular protocols. From bd945aa95b15d9f1dddffa8bea24e68c7afc544f Mon Sep 17 00:00:00 2001 From: chie8842 Date: Thu, 10 Nov 2016 01:29:43 +0900 Subject: [PATCH 08/11] reflected some comments, and improved for trivial issues in ml library. --- core/src/main/scala/org/apache/spark/ui/JettyUtils.scala | 9 ++++++--- docs/configuration.md | 3 ++- .../org/apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../main/scala/org/apache/spark/ml/tree/treeParams.scala | 6 ++++-- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index f2eab39426507..f7ab3a64594ed 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -307,15 +307,18 @@ private[spark] object JettyUtils extends Logging { connectors += httpConnector sslOptions.createJettySslContextFactory().foreach { factory => - // If the new port wraps around, do not try a privileged port. - require(sslOptions.port == 0 || (1024 <= sslOptions.port && sslOptions.port < 65536)) + require(sslOptions.port == 0 || (1024 <= sslOptions.port && sslOptions.port < 65536), + "securePort should be between 1024 and 65535 (inclusive)," + + " or 0 for determined automatically.") val securePort = if (currentPort != 0) { - if (1024 < sslOptions.port && sslOptions.port < 65536) { + // If the new port wraps around, do not try a privileged port. + if (1024 <= sslOptions.port && sslOptions.port < 65536) { sslOptions.port } else { + // If the new port wraps around, do not try a privilege port (currentPort + 400 - 1024) % (65536 - 1024) + 1024 } } else { diff --git a/docs/configuration.md b/docs/configuration.md index fd8b00a4b5d64..39f5009bf57f5 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1667,7 +1667,8 @@ Apart from these, the following properties are also available, and may be useful spark.ssl.port 0 - Port number to listen on for SSL connections. + Port number to listen on for SSL connections. + The SSL port should be between 1024 and 65535 (inclusive). Default value of 0 means the port will be determined automatically. Attention that the port should be separated for each particular protocols. diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index d1a5c2e82581e..29a6ad528982d 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -66,7 +66,7 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu * - This helps process a dataset of unknown vectors into a dataset with some continuous * features and some categorical features. The choice between continuous and categorical * is based upon a maxCategories parameter. - * - Set maxCategories to the maximum number of categorical any categorical feature should have. + * - Set maxCategories to the maximum number of categories which categorical feature should have. * - E.g.: Feature 0 has unique values {-1.0, 0.0}, and feature 1 values {1.0, 3.0, 5.0}. * If maxCategories = 2, then feature 0 will be declared categorical and use indices {0, 1}, * and feature 1 will be declared continuous. diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala index 57c7e44e97607..2d71416ed78c1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala @@ -85,8 +85,10 @@ private[ml] trait DecisionTreeParams extends PredictorParams * (default = 256 MB) * @group expertParam */ - final val maxMemoryInMB: IntParam = new IntParam(this, "maxMemoryInMB", - "Maximum memory in MB allocated to histogram aggregation.", + final val maxMemoryInMB: IntParam = new IntParam(this, "maxMemoryInMB", "Maximum memory in MB" + + " allocated to histogram aggregation." + + " If too small, then 1 node will be split per iteration," + + " and its aggregates may exceed this size.", ParamValidators.gtEq(0)) /** From 9d69f02857a55c7b5407fd542cc463a54757e6df Mon Sep 17 00:00:00 2001 From: chie hayashida Date: Thu, 10 Nov 2016 15:55:53 +0900 Subject: [PATCH 09/11] reflected comment --- .../main/scala/org/apache/spark/ui/JettyUtils.scala | 10 +++++----- docs/configuration.md | 2 +- .../org/apache/spark/ml/feature/VectorIndexer.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index f7ab3a64594ed..a2049adf9f290 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -314,12 +314,12 @@ private[spark] object JettyUtils extends Logging { val securePort = if (currentPort != 0) { - // If the new port wraps around, do not try a privileged port. - if (1024 <= sslOptions.port && sslOptions.port < 65536) { - sslOptions.port - } else { - // If the new port wraps around, do not try a privilege port + if (sslOptions.port == 0) { + // If the new port wraps around, do not try a privileged port (currentPort + 400 - 1024) % (65536 - 1024) + 1024 + } else { + // use sslOptions.port value as securePort + sslOptions.port } } else { 0 diff --git a/docs/configuration.md b/docs/configuration.md index 39f5009bf57f5..2862f4e1ba34e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1670,7 +1670,7 @@ Apart from these, the following properties are also available, and may be useful Port number to listen on for SSL connections. The SSL port should be between 1024 and 65535 (inclusive). Default value of 0 means the port will be determined automatically. - Attention that the port should be separated for each particular protocols. + The port can be specified for services individually, with properties like spark.ssl.YYY.port. diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index 29a6ad528982d..f6da1289aac09 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -66,7 +66,7 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu * - This helps process a dataset of unknown vectors into a dataset with some continuous * features and some categorical features. The choice between continuous and categorical * is based upon a maxCategories parameter. - * - Set maxCategories to the maximum number of categories which categorical feature should have. + * - Set maxCategories to the maximum number of categories that any categorical feature should have. * - E.g.: Feature 0 has unique values {-1.0, 0.0}, and feature 1 values {1.0, 3.0, 5.0}. * If maxCategories = 2, then feature 0 will be declared categorical and use indices {0, 1}, * and feature 1 will be declared continuous. From 30a292720641f7275bc2814fe78dfb1eec5143b5 Mon Sep 17 00:00:00 2001 From: chie8842 Date: Fri, 11 Nov 2016 10:29:24 +0900 Subject: [PATCH 10/11] rebased unrelated changes --- .../scala/org/apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../main/scala/org/apache/spark/ml/tree/treeParams.scala | 6 ++---- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index f6da1289aac09..d1a5c2e82581e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -66,7 +66,7 @@ private[ml] trait VectorIndexerParams extends Params with HasInputCol with HasOu * - This helps process a dataset of unknown vectors into a dataset with some continuous * features and some categorical features. The choice between continuous and categorical * is based upon a maxCategories parameter. - * - Set maxCategories to the maximum number of categories that any categorical feature should have. + * - Set maxCategories to the maximum number of categorical any categorical feature should have. * - E.g.: Feature 0 has unique values {-1.0, 0.0}, and feature 1 values {1.0, 3.0, 5.0}. * If maxCategories = 2, then feature 0 will be declared categorical and use indices {0, 1}, * and feature 1 will be declared continuous. diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala index 2d71416ed78c1..57c7e44e97607 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/treeParams.scala @@ -85,10 +85,8 @@ private[ml] trait DecisionTreeParams extends PredictorParams * (default = 256 MB) * @group expertParam */ - final val maxMemoryInMB: IntParam = new IntParam(this, "maxMemoryInMB", "Maximum memory in MB" + - " allocated to histogram aggregation." + - " If too small, then 1 node will be split per iteration," + - " and its aggregates may exceed this size.", + final val maxMemoryInMB: IntParam = new IntParam(this, "maxMemoryInMB", + "Maximum memory in MB allocated to histogram aggregation.", ParamValidators.gtEq(0)) /** From 83df9bc1750212b4a8b02367c4df07cba35753ae Mon Sep 17 00:00:00 2001 From: chie hayashida Date: Fri, 18 Nov 2016 10:52:45 +0900 Subject: [PATCH 11/11] solve conflict between global and specific configuration --- .../spark/deploy/rest/RestSubmissionServer.scala | 2 +- .../network/netty/NettyBlockTransferService.scala | 2 +- .../scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala | 2 +- .../main/scala/org/apache/spark/ui/JettyUtils.scala | 2 +- core/src/main/scala/org/apache/spark/util/Utils.scala | 10 ++++++---- .../spark/deploy/master/PersistenceEngineSuite.scala | 2 +- 6 files changed, 11 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala index b30c980e95a9a..8c9825e4a0041 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionServer.scala @@ -78,7 +78,7 @@ private[spark] abstract class RestSubmissionServer( * Map the servlets to their corresponding contexts and attach them to a server. * Return a 2-tuple of the started server and the bound port. */ - private def doStart(startPort: Int): (Server, Int) = { + private def doStart(startPort: Int, securePort: Int): (Server, Int) = { val threadPool = new QueuedThreadPool threadPool.setDaemon(true) val server = new Server(threadPool) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index dc70eb82d2b54..b31e020612883 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -76,7 +76,7 @@ private[spark] class NettyBlockTransferService( /** Creates and binds the TransportServer, possibly trying multiple ports. */ private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = { - def startService(port: Int): (TransportServer, Int) = { + def startService(port: Int, securePort: Int): (TransportServer, Int) = { val server = transportContext.createServer(bindAddress, port, bootstraps.asJava) (server, server.getPort) } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index e51649a1ecce9..22478f975612b 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -444,7 +444,7 @@ private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging { new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress, config.securityManager) if (!config.clientMode) { - val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort => + val startNettyRpcEnv: (Int, Int) => (NettyRpcEnv, Int) = { (actualPort, securePort) => nettyEnv.startServer(config.bindAddress, actualPort) (nettyEnv, nettyEnv.address.port) } diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index a2049adf9f290..d42c429a97a55 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -284,7 +284,7 @@ private[spark] object JettyUtils extends Logging { } // Bind to the given port, or throw a java.net.BindException if the port is occupied - def connect(currentPort: Int): (Server, Int) = { + def connect(currentPort: Int, securePort: Int = sslOptions.port): (Server, Int) = { val pool = new QueuedThreadPool if (serverName.nonEmpty) { pool.setName(serverName) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bfc609419ccdb..2ff5063fe2244 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2142,9 +2142,10 @@ private[spark] object Utils extends Logging { */ def startServiceOnPort[T]( startPort: Int, - startService: Int => (T, Int), + startService: (Int, Int) => (T, Int), conf: SparkConf, - serviceName: String = ""): (T, Int) = { + serviceName: String = "", + securePort: Int = 0): (T, Int) = { require(startPort == 0 || (1024 <= startPort && startPort < 65536), "startPort should be between 1024 and 65535 (inclusive), or 0 for a random free port.") @@ -2160,14 +2161,15 @@ private[spark] object Utils extends Logging { ((startPort + offset - 1024) % (65536 - 1024)) + 1024 } try { - val (service, port) = startService(tryPort) + val (service, port) = startService(tryPort, securePort + offset) logInfo(s"Successfully started service$serviceString on port $port.") return (service, port) } catch { case e: Exception if isBindCollision(e) => if (offset >= maxRetries) { val exceptionMessage = s"${e.getMessage}: Service$serviceString failed after " + - s"$maxRetries retries (starting from $startPort)! Consider explicitly setting " + + s"$maxRetries retries (starting from $startPort and $securePort)! " + + s"Consider explicitly setting " + s"the appropriate port for the service$serviceString (for example spark.ui.port " + s"for SparkUI) to an available port or increasing spark.port.maxRetries." val exception = new BindException(exceptionMessage) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala index 62fe0eaedfd27..58c11e59509ae 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/PersistenceEngineSuite.scala @@ -119,7 +119,7 @@ class PersistenceEngineSuite extends SparkFunSuite { private def findFreePort(conf: SparkConf): Int = { val candidatePort = RandomUtils.nextInt(1024, 65536) - Utils.startServiceOnPort(candidatePort, (trialPort: Int) => { + Utils.startServiceOnPort(candidatePort, (trialPort: Int, securePort: Int) => { val socket = new ServerSocket(trialPort) socket.close() (null, trialPort)