diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala
index 3b9c885bf97a..fa38895fca32 100644
--- a/core/src/main/scala/org/apache/spark/SSLOptions.scala
+++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala
@@ -24,6 +24,10 @@ import javax.net.ssl.SSLContext
import scala.collection.JavaConverters._
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
+
+import org.apache.spark.network.util.ConfigProvider
+import org.apache.spark.network.util.ssl.SSLFactory
+
import org.eclipse.jetty.util.ssl.SslContextFactory
/**
@@ -34,23 +38,38 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* by the protocol, which it can generate the configuration for. Since Akka doesn't support client
* authentication with SSL, SSLOptions cannot support it either.
*
- * @param enabled enables or disables SSL; if it is set to false, the rest of the
- * settings are disregarded
- * @param keyStore a path to the key-store file
- * @param keyStorePassword a password to access the key-store file
- * @param keyPassword a password to access the private key in the key-store
- * @param trustStore a path to the trust-store file
- * @param trustStorePassword a password to access the trust-store file
- * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
- * @param enabledAlgorithms a set of encryption algorithms that may be used
+ * @param nameSpace the configuration namespace
+ * @param enabled enables or disables SSL;
+ * if it is set to false, the rest of the settings are disregarded
+ * @param keyStore a path to the key-store file
+ * @param keyStorePassword a password to access the key-store file
+ * @param privateKey a PKCS#8 private key file in PEM format
+ * @param keyPassword a password to access the private key in the key-store
+ * @param certChain an X.509 certificate chain file in PEM format
+ * @param trustStore a path to the trust-store file
+ * @param trustStorePassword a password to access the trust-store file
+ * @param trustStoreReloadingEnabled enables or disables using a trust-store that that
+ * reloads its configuration when the trust-store file on disk changes
+ * @param trustStoreReloadInterval the interval, in milliseconds,
+ * when the trust-store will reload its configuration
+ * @param openSslEnabled enables or disables using an OpenSSL implementation
+ * (if available on host system), requires certChain and keyFile arguments
+ * @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
+ * @param enabledAlgorithms a set of encryption algorithms that may be used
*/
private[spark] case class SSLOptions(
+ nameSpace: Option[String] = None,
enabled: Boolean = false,
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
+ privateKey: Option[File] = None,
keyPassword: Option[String] = None,
+ certChain: Option[File] = None,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
+ trustStoreReloadingEnabled: Boolean = false,
+ trustStoreReloadInterval: Int = 10000,
+ openSslEnabled: Boolean = false,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
@@ -76,6 +95,30 @@ private[spark] case class SSLOptions(
}
}
+ /**
+ *
+ * @return
+ */
+ def createSSLFactory: Option[SSLFactory] = {
+ if (enabled) {
+ Some(new SSLFactory.Builder()
+ .requestedProtocol(protocol.getOrElse(null))
+ .requestedCiphers(enabledAlgorithms.toArray)
+ .keyStore(keyStore.getOrElse(null), keyStorePassword.getOrElse(null))
+ .privateKey(privateKey.getOrElse(null))
+ .keyPassword(keyPassword.getOrElse(null))
+ .certChain(certChain.getOrElse(null))
+ .trustStore(
+ trustStore.getOrElse(null),
+ trustStorePassword.getOrElse(null),
+ trustStoreReloadingEnabled,
+ trustStoreReloadInterval)
+ .build())
+ } else {
+ None
+ }
+ }
+
/**
* Creates an Akka configuration object which contains all the SSL settings represented by this
* object. It can be used then to compose the ultimate Akka configuration.
@@ -136,11 +179,69 @@ private[spark] case class SSLOptions(
enabledAlgorithms & providerAlgorithms
}
+ /**
+ * Simple implicit helper class to allow for string interpolation
+ * pattern matching...
+ * @param sc
+ */
+ implicit private class NsContext (val sc: StringContext) {
+ object ns {
+ def apply (args : Any*) : String = sc.s (args : _*)
+ def unapplySeq (s : String) : Option[Seq[String]] = {
+ val regexp = sc.parts.mkString ("(.+)").r
+ regexp.unapplySeq(s)
+ }
+ }
+ }
+
+ /**
+ *
+ * @return
+ */
+ def createConfigProvider(conf: SparkConf): ConfigProvider = {
+ val nsp = nameSpace.getOrElse("spark.ssl")
+ new ConfigProvider() {
+ override def get(name: String): String = conf.get(name)
+ override def getBoolean(name: String, defaultValue: Boolean): Boolean = {
+ name match {
+ case ns"$nsp.enabled" => enabled
+ case ns"$nsp.trustStoreReloadingEnabled" => trustStoreReloadingEnabled
+ case ns"$nsp.openSslEnabled" => openSslEnabled
+ case _ => conf.getBoolean(name, defaultValue)
+ }
+ }
+
+ override def getInt(name: String, defaultValue: Int): Int = {
+ name match {
+ case ns"$nsp.trustStoreReloadInterval" => trustStoreReloadInterval
+ case _ => conf.getInt(name, defaultValue)
+ }
+ }
+
+ override def get(name: String, defaultValue: String): String = {
+ name match {
+ case ns"$nsp.keyStore" => keyStore.fold(defaultValue)(_.getAbsolutePath)
+ case ns"$nsp.keyStorePassword" => keyStorePassword.getOrElse(defaultValue)
+ case ns"$nsp.privateKey" => privateKey.fold(defaultValue)(_.getAbsolutePath)
+ case ns"$nsp.keyPassword" => keyPassword.getOrElse(defaultValue)
+ case ns"$nsp.certChain" => certChain.fold(defaultValue)(_.getAbsolutePath)
+ case ns"$nsp.trustStore" => trustStore.fold(defaultValue)(_.getAbsolutePath)
+ case ns"$nsp.trustStorePassword" => trustStorePassword.getOrElse(defaultValue)
+ case ns"$nsp.protocol" => protocol.getOrElse(defaultValue)
+ case ns"$nsp.enabledAlgorithms" => enabledAlgorithms.mkString(",")
+ case _ => conf.get(name, defaultValue)
+ }
+ }
+ }
+ }
+
/** Returns a string representation of this SSLOptions with all the passwords masked. */
override def toString: String = s"SSLOptions{enabled=$enabled, " +
- s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
- s"trustStore=$trustStore, trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
- s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
+ s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +
+ s"keyFile=$privateKey, certChain=$certChain, trustStore=$trustStore, " +
+ s"trustStorePassword=${trustStorePassword.map(_ => "xxx")}, " +
+ s"openSslEnabled=$openSslEnabled, trustStoreReloadingEnabled=$trustStoreReloadingEnabled, " +
+ s"protocol=$protocol, enabledAlgorithms=$enabledAlgorithms}"
}
@@ -150,12 +251,21 @@ private[spark] object SSLOptions extends Logging {
*
* The following settings are allowed:
* $ - `[ns].enabled` - `true` or `false`, to enable or disable SSL respectively
- * $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
+ * $ - `[ns].keyStore` - a path to the key-store file; can be relative
+ * to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
+ * $ - `[ns].privateKey` - a PKCS#8 private key file in PEM format
* $ - `[ns].keyPassword` - a password to the private key
- * $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
- * directory
+ * $ - `[ns].certChain` - an X.509 certificate chain file in PEM format
+ * $ - `[ns].trustStore` - a path to the trust-store file; can be relative
+ * to the current directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
+ * $ - `[ns].trustStoreReloadingEnabled` - enables or disables using a trust-store
+ * that that reloads its configuration when the trust-store file on disk changes
+ * $ - `[ns].trustStoreReloadInterval` - the interval, in milliseconds, the
+ * trust-store will reload its configuration
+ * $ - `[ns].openSslEnabled` - enables or disables using an OpenSSL implementation
+ * (if available on host system), requires certChain and keyFile arguments
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
@@ -180,15 +290,27 @@ private[spark] object SSLOptions extends Logging {
val keyStorePassword = conf.getOption(s"$ns.keyStorePassword")
.orElse(defaults.flatMap(_.keyStorePassword))
+ val privateKey = conf.getOption(s"$ns.privateKey").map(new File(_))
+ .orElse(defaults.flatMap(_.privateKey))
+
val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))
+ val certChain = conf.getOption(s"$ns.certChain").map(new File(_))
+ .orElse(defaults.flatMap(_.certChain))
+
val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))
val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))
+ val trustStoreReloadingEnabled = conf.getBoolean(s"$ns.trustStoreReloadingEnabled", false)
+
+ val trustStoreReloadInterval = conf.getInt(s"$ns.trustStoreReloadInterval", 10000)
+
+ val openSslEnabled = conf.getBoolean(s"$ns.openSslEnabled", false)
+
val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))
@@ -198,12 +320,18 @@ private[spark] object SSLOptions extends Logging {
.getOrElse(Set.empty)
new SSLOptions(
+ Some(ns),
enabled,
keyStore,
keyStorePassword,
+ privateKey,
keyPassword,
+ certChain,
trustStore,
trustStorePassword,
+ trustStoreReloadingEnabled,
+ trustStoreReloadInterval,
+ openSslEnabled,
protocol,
enabledAlgorithms)
}
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index 64e483e38477..4badb1ca3e84 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -101,12 +101,12 @@ import org.apache.spark.util.Utils
* over the wire in plaintext.
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
- * Spark currently supports "auth" for the quality of protection, which means
- * the connection does not support integrity or privacy protection (encryption)
- * after authentication. SASL also supports "auth-int" and "auth-conf" which
- * SPARK could support in the future to allow the user to specify the quality
- * of protection they want. If we support those, the messages will also have to
- * be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
+ * Spark currently supports "auth" for the quality of protection, and if configured,
+ * privacy protection (SSL/TLS encryption) for the Netty based implementation.
+ * SASL also supports "auth-int" and "auth-conf" which SPARK could support
+ * in the future to allow the user to specify the quality of protection they want.
+ * If we support those, the messages will also have to be wrapped and unwrapped
+ * via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the NioBlockTransferService does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
@@ -169,7 +169,8 @@ import org.apache.spark.util.Utils
* configuration for the particular protocol, the properties must be overwritten in the
* protocol-specific namespace. Use `spark.ssl.yyy.xxx` settings to overwrite the global
* configuration for particular protocol denoted by `yyy`. Currently `yyy` can be either `akka` for
- * Akka based connections or `fs` for broadcast and file server.
+ * Akka based connections, `fs` for broadcast and file server, or `bts` for the Netty based
+ * block transfer service.
*
* Refer to [[org.apache.spark.SSLOptions]] documentation for the list of
* options that can be specified.
@@ -177,7 +178,7 @@ import org.apache.spark.util.Utils
* SecurityManager initializes SSLOptions objects for different protocols separately. SSLOptions
* object parses Spark configuration at a given namespace and builds the common representation
* of SSL settings. SSLOptions is then used to provide protocol-specific configuration like
- * TypeSafe configuration for Akka or SSLContextFactory for Jetty.
+ * TypeSafe configuration for Akka, SSLContextFactory for Jetty, or SSLContext for Netty.
*
* SSL must be configured on each node and configured for each component involved in
* communication using the particular protocol. In YARN clusters, the key-store can be prepared on
@@ -248,9 +249,11 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))
+ val btsSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.bts", Some(defaultSSLOptions))
logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")
+ logDebug(s"SSLConfiguration for block transfer service (Netty): $btsSSLOptions")
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
index c514a1a86bab..809012029e1c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala
@@ -45,8 +45,8 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana
private val port = sparkConf.getInt("spark.shuffle.service.port", 7337)
private val useSasl: Boolean = securityManager.isAuthenticationEnabled()
- private val transportConf =
- SparkTransportConf.fromSparkConf(sparkConf, "shuffle", numUsableCores = 0)
+ private val transportConf = SparkTransportConf.fromSparkConf(
+ sparkConf, "shuffle", numUsableCores = 0, Some(securityManager.btsSSLOptions))
private val blockHandler = newShuffleBlockHandler(transportConf)
private val transportContext: TransportContext =
new TransportContext(transportConf, blockHandler, true)
diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 40604a4da18d..a1b294864981 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -44,7 +44,8 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format.
private val serializer = new JavaSerializer(conf)
private val authEnabled = securityManager.isAuthenticationEnabled()
- private val transportConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numCores)
+ private val transportConf = SparkTransportConf.fromSparkConf(
+ conf, "shuffle", numCores, Some(securityManager.btsSSLOptions))
private[this] var transportContext: TransportContext = _
private[this] var server: TransportServer = _
diff --git a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
index 84833f59d7af..1a3b469b4413 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/SparkTransportConf.scala
@@ -17,7 +17,7 @@
package org.apache.spark.network.netty
-import org.apache.spark.SparkConf
+import org.apache.spark.{SSLOptions, SparkConf}
import org.apache.spark.network.util.{TransportConf, ConfigProvider}
/**
@@ -45,8 +45,13 @@ object SparkTransportConf {
* @param numUsableCores if nonzero, this will restrict the server and client threads to only
* use the given number of cores, rather than all of the machine's cores.
* This restriction will only occur if these properties are not already set.
+ * @param sslOptions SSL config options
*/
- def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
+ def fromSparkConf(
+ _conf: SparkConf,
+ module: String,
+ numUsableCores: Int = 0,
+ sslOptions: Option[SSLOptions] = None): TransportConf = {
val conf = _conf.clone
// Specify thread configuration based on our JVM's allocation of cores (rather than necessarily
@@ -56,9 +61,9 @@ object SparkTransportConf {
conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
- new TransportConf(module, new ConfigProvider {
+ new TransportConf(module, sslOptions.fold(new ConfigProvider {
override def get(name: String): String = conf.get(name)
- })
+ })(_.createConfigProvider(conf)))
}
/**
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index b5b7804d54ce..15f13663be81 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -122,7 +122,8 @@ private[spark] class BlockManager(
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
- val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
+ val transConf = SparkTransportConf.fromSparkConf(
+ conf, "shuffle", numUsableCores, Some(securityManager.btsSSLOptions))
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled())
} else {
diff --git a/core/src/test/resources/certchain.pem b/core/src/test/resources/certchain.pem
new file mode 100644
index 000000000000..51cbe6ac607a
--- /dev/null
+++ b/core/src/test/resources/certchain.pem
@@ -0,0 +1,73 @@
+Certificate:
+ Data:
+ Version: 3 (0x2)
+ Serial Number: 1362775461 (0x513a4da5)
+ Signature Algorithm: sha256WithRSAEncryption
+ Issuer: C=Unknown, ST=Unknown, L=Unknown, O=Unknown, OU=Unknown, CN=Unknown
+ Validity
+ Not Before: Oct 8 23:10:34 2014 GMT
+ Not After : Jan 6 23:10:34 2015 GMT
+ Subject: C=Unknown, ST=Unknown, L=Unknown, O=Unknown, OU=Unknown, CN=Unknown
+ Subject Public Key Info:
+ Public Key Algorithm: rsaEncryption
+ RSA Public Key: (2048 bit)
+ Modulus (2048 bit):
+ 00:83:4d:72:ce:33:08:4d:fc:f4:a2:41:da:0e:e1:
+ 23:23:bd:59:f9:ed:48:60:7e:8c:65:72:d8:83:56:
+ ac:9c:11:99:c6:39:fb:19:6e:b2:f7:77:58:13:7c:
+ 9c:d8:6e:5a:b0:93:f4:4d:95:cb:be:b9:f9:8f:64:
+ 3c:d9:86:79:4d:d3:17:d5:aa:82:64:dc:12:76:85:
+ c8:fa:d2:3b:87:cf:86:dd:a6:16:93:b3:7b:67:6f:
+ 4b:2a:8d:27:ef:7e:28:14:2b:08:ca:dd:3e:b9:63:
+ cd:d4:e3:24:0d:df:5a:10:db:43:32:90:e0:81:56:
+ cb:35:ae:2c:d6:e9:5c:31:e3:89:bb:58:c2:95:13:
+ f2:af:8e:94:46:29:d9:11:67:05:0c:ba:e0:86:cb:
+ b3:ea:c6:51:a9:8c:b6:99:c2:ed:de:3f:aa:ef:c1:
+ b5:b6:aa:07:4b:76:0c:52:3f:43:47:96:4b:c9:75:
+ 57:2c:e9:ea:ed:6b:4e:be:d9:8d:b1:3f:f9:35:41:
+ 49:65:ee:ea:8f:5c:d0:98:46:2f:dc:6a:71:9e:a9:
+ 6e:41:cf:71:9e:49:43:b1:e3:b1:8c:c5:ae:47:49:
+ d2:b1:f1:67:97:75:35:9a:07:63:e0:e6:cd:07:23:
+ 99:98:9e:5c:41:23:56:4c:1c:3e:ef:09:ed:a2:f0:
+ da:a7
+ Exponent: 65537 (0x10001)
+ X509v3 extensions:
+ X509v3 Subject Key Identifier:
+ 02:25:CC:18:00:FA:F9:10:EA:07:CA:D5:A3:B3:5E:EB:17:EE:51:11
+ Signature Algorithm: sha256WithRSAEncryption
+ 38:1d:2e:62:c1:34:84:08:7b:bc:11:5e:bf:b3:9f:c6:44:b8:
+ 1d:4c:c9:48:f4:f7:94:5c:58:8b:ed:33:20:4a:c8:e8:7d:ca:
+ 8c:97:ac:93:e8:ce:91:1b:97:d9:06:01:c0:6e:b9:bf:d0:e8:
+ ca:f1:0e:46:f6:9b:5d:26:08:eb:0b:ea:99:97:05:3c:35:81:
+ 6f:38:45:1c:0e:f5:94:a6:ec:c0:76:44:26:84:d4:9d:d5:cf:
+ 24:6d:89:74:62:d0:04:13:75:5c:6e:ea:cc:8b:8d:d3:0f:c3:
+ ea:bb:b3:3b:4f:ec:d4:26:36:43:c1:2b:98:4e:75:ab:b2:78:
+ ab:75:56:c3:0d:cf:78:c9:4b:03:85:04:55:f7:11:e0:25:d7:
+ ff:0f:ee:18:a4:97:ab:b0:37:fe:aa:3e:8c:55:42:36:b2:dd:
+ 6b:c7:c9:1a:ab:8d:c6:dc:07:ed:88:c6:ff:87:c1:f0:c0:04:
+ 53:15:fc:12:6f:7c:0f:53:2b:57:42:b3:69:ad:e7:21:61:42:
+ 40:f6:cf:12:c5:73:35:ce:4e:38:7e:25:ef:2c:e4:78:e1:bc:
+ ad:2d:99:ec:9a:6d:c8:88:8f:74:d0:a5:b3:36:06:67:fe:d2:
+ 6b:ac:c5:dd:bc:30:a5:20:06:b6:c3:3d:fd:34:86:e3:64:93:
+ 93:84:2c:fe
+-----BEGIN CERTIFICATE-----
+MIIDdzCCAl+gAwIBAgIEUTpNpTANBgkqhkiG9w0BAQsFADBsMRAwDgYDVQQGEwdV
+bmtub3duMRAwDgYDVQQIEwdVbmtub3duMRAwDgYDVQQHEwdVbmtub3duMRAwDgYD
+VQQKEwdVbmtub3duMRAwDgYDVQQLEwdVbmtub3duMRAwDgYDVQQDEwdVbmtub3du
+MB4XDTE0MTAwODIzMTAzNFoXDTE1MDEwNjIzMTAzNFowbDEQMA4GA1UEBhMHVW5r
+bm93bjEQMA4GA1UECBMHVW5rbm93bjEQMA4GA1UEBxMHVW5rbm93bjEQMA4GA1UE
+ChMHVW5rbm93bjEQMA4GA1UECxMHVW5rbm93bjEQMA4GA1UEAxMHVW5rbm93bjCC
+ASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAINNcs4zCE389KJB2g7hIyO9
+WfntSGB+jGVy2INWrJwRmcY5+xlusvd3WBN8nNhuWrCT9E2Vy765+Y9kPNmGeU3T
+F9WqgmTcEnaFyPrSO4fPht2mFpOze2dvSyqNJ+9+KBQrCMrdPrljzdTjJA3fWhDb
+QzKQ4IFWyzWuLNbpXDHjibtYwpUT8q+OlEYp2RFnBQy64IbLs+rGUamMtpnC7d4/
+qu/BtbaqB0t2DFI/Q0eWS8l1Vyzp6u1rTr7ZjbE/+TVBSWXu6o9c0JhGL9xqcZ6p
+bkHPcZ5JQ7HjsYzFrkdJ0rHxZ5d1NZoHY+DmzQcjmZieXEEjVkwcPu8J7aLw2qcC
+AwEAAaMhMB8wHQYDVR0OBBYEFAIlzBgA+vkQ6gfK1aOzXusX7lERMA0GCSqGSIb3
+DQEBCwUAA4IBAQA4HS5iwTSECHu8EV6/s5/GRLgdTMlI9PeUXFiL7TMgSsjofcqM
+l6yT6M6RG5fZBgHAbrm/0OjK8Q5G9ptdJgjrC+qZlwU8NYFvOEUcDvWUpuzAdkQm
+hNSd1c8kbYl0YtAEE3VcburMi43TD8Pqu7M7T+zUJjZDwSuYTnWrsnirdVbDDc94
+yUsDhQRV9xHgJdf/D+4YpJersDf+qj6MVUI2st1rx8kaq43G3AftiMb/h8HwwART
+FfwSb3wPUytXQrNprechYUJA9s8SxXM1zk44fiXvLOR44bytLZnsmm3IiI900KWz
+NgZn/tJrrMXdvDClIAa2wz39NIbjZJOThCz+
+-----END CERTIFICATE-----
diff --git a/core/src/test/resources/key.pem b/core/src/test/resources/key.pem
new file mode 100644
index 000000000000..1ec52befe241
--- /dev/null
+++ b/core/src/test/resources/key.pem
@@ -0,0 +1,29 @@
+-----BEGIN ENCRYPTED PRIVATE KEY-----
+MIIE6TAbBgkqhkiG9w0BBQMwDgQIEFY0GxsC3RQCAggABIIEyAHqMgNVehmH7LYZ
+ZOGXokaX/Po+8s1Jo4IyDIzUb4xcLcZu4rNERUouj0cMyXCO8/FYeREE9zJi1f/o
+/SQjG84tXk9ikFvGBkNFgAzS5hl1tzICIAsbua/8jhZoB9AC7fTN7dfO7zywL/+b
+GkU0mhML+1KDgTRF3KQWbLuI4EieoQHbRjXqG0tENT3Bs7Fw2dGQ40+03aR4Hqc+
+dtuhBNOwEp+524P3PuKvkNH3c6hLtHY6WYotVN3wmrToqFPsOWUaxIQ3HjTLtifa
+DiGPRHPFRyOND9gupw2Vl8idsg8xWNtFxqrctWxJAqg0V04l2dntyIDZEMD1sg50
+g9+UQmMU8Z65WmpNFlXHLNSaL4WaEKDSIGKVX1dqKJHGII7BV5VKCGS8zewDBvcn
+MQBjzvXm+uXzHU7UXvVFDph+hXOU5NV6Qd8NYckhtsD0s3yTHMh0XZ45cckk6W4X
+G9CIiNNh+tPrbEXzxLylApE4jRnR5m9QUvKOuCTi3xeLCHf0/ADEcZSgUwM3U+J4
+GB8mIaEpj2X/ZxC1Rkm35UkhAMmFrlvspuaj+bAUHi0p2AE124xM1sP/kR/EAY2k
+t0x19rlWhHF06oMvrxyfDeAZWNutMIJL91LQKFQHiWXwkX9OfNH7f/su4hXdCvOH
+8h/IHT7la63+7hph9/NucphkR35AbvTNhHVICPQjd/jS4KPgMf66PehcLQilsmOA
+GgyNMS+An4zjb2oqeqE3ot8jQ8RCGhF20J/CkUM/Lpba0DaDbc0IzEISkNESSi/o
+Bcw9/WBzfG0VyJTZPMzVaEEhce46hMeRihFU4RWNdyc8reF1bzrvoY/aT8R2dIa0
+7Jkmw8OZHJqTV4Kii4c5GS/2d0BKMyxJ7GX2NF3GK7wDVgyAGYQoGlXXfHn4Di69
+9+41BAENvRXIV16UTTR3tsIUImpkxxyDKSV6ZSyL+JUchQADKI2wRpMFZA2BA/K3
+d1Jnouox/VIJ3AmjuqGUEz42M2Ufdl++pIjLNoKIhILPsDtZSG5dWdHrMzZElbkv
+Kd+p1pr+UG6TldZGDh5uWBHbAR5loCqqAacGZjFiTHE2stOni7OjQJ+3mMSGsqH1
+TO1U2/Iqy1oNYNBI1A8yvGYVGnpCp4a946AK7IbG85/WmD4VKY028UH9JqIfUEEJ
+K0nQ2fxRdGpyzNyU+AJJhTfdoN9wmI+vqV9uImXTMqgO6eDury1OQ2fdQA9EzY6W
+N9th4PGk5ALkiyM+AcYf7EIaZZbiPcX+qYZ+n9mEdwFqqZQe9H3VwBVVx0mO9c0+
+Cciy3mFC1p64dahjLKY1AyNNl8S7J6xLm2Cf2IW1yAvoAYC6DTi/uvpS/zIojsbl
+DFnLGD9jEec7VsNUsTaSQ1Qj9DBONH6r/2pn9TmmjgRTMCZzrG/725D8ztWDwT+q
+pVrXLZajtKhO41eSS63O43HbQceGLy1prembPLKIS6ivzEWdaAZy9TaJIA3sKr8u
+e3o13FIO6pdjJybAoqDOtadKLIf3/nPhAqhRDRZv6xe0ScetPZGouj0UlTuP/JR3
+Qu4mXp3Ys8/f1rjaV9aZGyx+0dNaSHP0myscTSJLlAwad1xSjQQFIem3JmTI9Zfa
+cvRQhVqZeFvyrnkh0Q==
+-----END ENCRYPTED PRIVATE KEY-----
diff --git a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
index 25b79bce6ab9..c11171c7f63e 100644
--- a/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SSLOptionsSuite.scala
@@ -20,8 +20,6 @@ package org.apache.spark
import java.io.File
import javax.net.ssl.SSLContext
-import com.google.common.io.Files
-import org.apache.spark.util.Utils
import org.scalatest.BeforeAndAfterAll
class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
@@ -29,6 +27,8 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("test resolving property file as spark conf ") {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+ val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+ val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
// Pick two cipher suites that the provider knows about
val sslContext = SSLContext.getInstance("TLSv1.2")
@@ -44,8 +44,13 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyStore", keyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.privateKey", privateKeyPath)
+ conf.set("spark.ssl.certChain", certChainPath)
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.trustStoreReloadingEnabled", "false")
+ conf.set("spark.ssl.trustStoreReloadInterval", "10000")
+ conf.set("spark.ssl.openSslEnabled", "false")
conf.set("spark.ssl.enabledAlgorithms", algorithms.mkString(","))
conf.set("spark.ssl.protocol", "TLSv1.2")
@@ -55,12 +60,21 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(opts.trustStore.isDefined === true)
assert(opts.trustStore.get.getName === "truststore")
assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+ assert(opts.trustStorePassword === Some("password"))
+ assert(opts.trustStoreReloadingEnabled === false)
+ assert(opts.trustStoreReloadInterval === 10000)
+ assert(opts.privateKey.isDefined === true)
+ assert(opts.privateKey.get.getName === "key.pem")
+ assert(opts.privateKey.get.getAbsolutePath === privateKeyPath)
+ assert(opts.certChain.isDefined === true)
+ assert(opts.certChain.get.getName === "certchain.pem")
+ assert(opts.certChain.get.getAbsolutePath === certChainPath)
assert(opts.keyStore.isDefined === true)
assert(opts.keyStore.get.getName === "keystore")
assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
- assert(opts.trustStorePassword === Some("password"))
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
+ assert(opts.openSslEnabled === false)
assert(opts.protocol === Some("TLSv1.2"))
assert(opts.enabledAlgorithms === algorithms)
}
@@ -68,14 +82,21 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("test resolving property with defaults specified ") {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+ val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+ val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
val conf = new SparkConf
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
conf.set("spark.ssl.keyStorePassword", "password")
conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.privateKey", privateKeyPath)
+ conf.set("spark.ssl.certChain", certChainPath)
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.trustStoreReloadingEnabled", "false")
+ conf.set("spark.ssl.trustStoreReloadInterval", "10000")
+ conf.set("spark.ssl.openSslEnabled", "false")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ssl.protocol", "SSLv3")
@@ -87,12 +108,21 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(opts.trustStore.isDefined === true)
assert(opts.trustStore.get.getName === "truststore")
assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+ assert(opts.privateKey.isDefined === true)
+ assert(opts.privateKey.get.getName === "key.pem")
+ assert(opts.privateKey.get.getAbsolutePath === privateKeyPath)
+ assert(opts.certChain.isDefined === true)
+ assert(opts.certChain.get.getName === "certchain.pem")
+ assert(opts.certChain.get.getAbsolutePath === certChainPath)
assert(opts.keyStore.isDefined === true)
assert(opts.keyStore.get.getName === "keystore")
assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
- assert(opts.trustStorePassword === Some("password"))
assert(opts.keyStorePassword === Some("password"))
assert(opts.keyPassword === Some("password"))
+ assert(opts.trustStorePassword === Some("password"))
+ assert(opts.trustStoreReloadingEnabled === false)
+ assert(opts.trustStoreReloadInterval === 10000)
+ assert(opts.openSslEnabled === false)
assert(opts.protocol === Some("SSLv3"))
assert(opts.enabledAlgorithms ===
Set("TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA"))
@@ -101,6 +131,8 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
test("test whether defaults can be overridden ") {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
+ val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+ val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
val conf = new SparkConf
conf.set("spark.ssl.enabled", "true")
@@ -109,8 +141,13 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.ssl.keyStorePassword", "password")
conf.set("spark.ui.ssl.keyStorePassword", "12345")
conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.privateKey", privateKeyPath)
+ conf.set("spark.ssl.certChain", certChainPath)
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ui.ssl.trustStoreReloadingEnabled", "true")
+ conf.set("spark.ui.ssl.trustStoreReloadInterval", "20000")
+ conf.set("spark.ui.ssl.openSslEnabled", "true")
conf.set("spark.ssl.enabledAlgorithms",
"TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA")
conf.set("spark.ui.ssl.enabledAlgorithms", "ABC, DEF")
@@ -123,12 +160,21 @@ class SSLOptionsSuite extends SparkFunSuite with BeforeAndAfterAll {
assert(opts.trustStore.isDefined === true)
assert(opts.trustStore.get.getName === "truststore")
assert(opts.trustStore.get.getAbsolutePath === trustStorePath)
+ assert(opts.privateKey.isDefined === true)
+ assert(opts.privateKey.get.getName === "key.pem")
+ assert(opts.privateKey.get.getAbsolutePath === privateKeyPath)
+ assert(opts.certChain.isDefined === true)
+ assert(opts.certChain.get.getName === "certchain.pem")
+ assert(opts.certChain.get.getAbsolutePath === certChainPath)
assert(opts.keyStore.isDefined === true)
assert(opts.keyStore.get.getName === "keystore")
assert(opts.keyStore.get.getAbsolutePath === keyStorePath)
assert(opts.trustStorePassword === Some("password"))
assert(opts.keyStorePassword === Some("12345"))
assert(opts.keyPassword === Some("password"))
+ assert(opts.trustStoreReloadingEnabled === true)
+ assert(opts.trustStoreReloadInterval === 20000)
+ assert(opts.openSslEnabled === true)
assert(opts.protocol === Some("SSLv3"))
assert(opts.enabledAlgorithms === Set("ABC", "DEF"))
}
diff --git a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
index 33270bec6247..2453afc4782d 100644
--- a/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
+++ b/core/src/test/scala/org/apache/spark/SSLSampleConfigs.scala
@@ -19,8 +19,12 @@ package org.apache.spark
import java.io.File
+import org.apache.hadoop.conf.Configuration
+
object SSLSampleConfigs {
val keyStorePath = new File(this.getClass.getResource("/keystore").toURI).getAbsolutePath
+ val privateKeyPath = new File(this.getClass.getResource("/key.pem").toURI).getAbsolutePath
+ val certChainPath = new File(this.getClass.getResource("/certchain.pem").toURI).getAbsolutePath
val untrustedKeyStorePath = new File(
this.getClass.getResource("/untrusted-keystore").toURI).getAbsolutePath
val trustStorePath = new File(this.getClass.getResource("/truststore").toURI).getAbsolutePath
@@ -43,15 +47,75 @@ object SSLSampleConfigs {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.ssl.enabled", "true")
conf.set("spark.ssl.keyStore", keyStorePath)
+ conf.set("spark.ssl.privateKey", privateKeyPath)
+ conf.set("spark.ssl.certChain", certChainPath)
+ conf.set("spark.ssl.keyStorePassword", "password")
+ conf.set("spark.ssl.keyPassword", "password")
+ conf.set("spark.ssl.trustStore", trustStorePath)
+ conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.trustStoreReloadingEnabled", "false")
+ conf.set("spark.ssl.trustStoreReloadInterval", "10000")
+ conf.set("spark.ssl.openSslEnabled", "false")
+ conf.set("spark.ssl.enabledAlgorithms", enabledAlgorithms)
+ conf.set("spark.ssl.protocol", "TLSv1.2")
+ conf
+ }
+
+ def setSparkSSLConfig(conf: SparkConf): SparkConf = {
+ conf.set("spark.ssl.enabled", "true")
+ conf.set("spark.ssl.keyStore", keyStorePath)
+ conf.set("spark.ssl.privateKey", privateKeyPath)
+ conf.set("spark.ssl.certChain", certChainPath)
conf.set("spark.ssl.keyStorePassword", "password")
conf.set("spark.ssl.keyPassword", "password")
conf.set("spark.ssl.trustStore", trustStorePath)
conf.set("spark.ssl.trustStorePassword", "password")
+ conf.set("spark.ssl.trustStoreReloadingEnabled", "false")
+ conf.set("spark.ssl.trustStoreReloadInterval", "10000")
+ conf.set("spark.ssl.openSslEnabled", "false")
conf.set("spark.ssl.enabledAlgorithms", enabledAlgorithms)
conf.set("spark.ssl.protocol", "TLSv1.2")
conf
}
+ def setSparkSSLShuffleConfig(conf: SparkConf): SparkConf = {
+ conf.set("spark.ssl.bts.enabled", "true")
+ conf.set("spark.ssl.bts.keyStore", keyStorePath)
+ conf.set("spark.ssl.bts.privateKey", privateKeyPath)
+ conf.set("spark.ssl.bts.certChain", certChainPath)
+ conf.set("spark.ssl.bts.keyStorePassword", "password")
+ conf.set("spark.ssl.bts.keyPassword", "password")
+ conf.set("spark.ssl.bts.trustStore", trustStorePath)
+ conf.set("spark.ssl.bts.trustStorePassword", "password")
+ conf.set("spark.ssl.bts.trustStoreReloadingEnabled", "false")
+ conf.set("spark.ssl.bts.trustStoreReloadInterval", "10000")
+ conf.set("spark.ssl.bts.openSslEnabled", "false")
+ conf.set("spark.ssl.bts.enabledAlgorithms", enabledAlgorithms)
+ conf.set("spark.ssl.bts.protocol", "TLSv1.2")
+ conf
+ }
+
+ def setSparkSSLShuffleConfig(conf: Configuration): Configuration = {
+ conf.set("spark.ssl.bts.enabled", "true")
+ conf.set("spark.ssl.bts.keyStore", keyStorePath)
+ conf.set("spark.ssl.bts.privateKey", privateKeyPath)
+ conf.set("spark.ssl.bts.certChain", certChainPath)
+ conf.set("spark.ssl.bts.keyStorePassword", "password")
+ conf.set("spark.ssl.bts.keyPassword", "password")
+ conf.set("spark.ssl.bts.trustStore", trustStorePath)
+ conf.set("spark.ssl.bts.trustStorePassword", "password")
+ conf.set("spark.ssl.bts.trustStoreReloadingEnabled", "false")
+ conf.set("spark.ssl.bts.trustStoreReloadInterval", "10000")
+ conf.set("spark.ssl.bts.openSslEnabled", "false")
+ conf.set("spark.ssl.bts.enabledAlgorithms", enabledAlgorithms)
+ conf.set("spark.ssl.bts.protocol", "TLSv1.2")
+ conf
+ }
+
+ def sparkSSLOptions(): SSLOptions = {
+ SSLOptions.parse(sparkSSLConfig(), "spark.ssl")
+ }
+
def sparkSSLConfigUntrusted(): SparkConf = {
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.ssl.enabled", "true")
diff --git a/core/src/test/scala/org/apache/spark/SecureExternalShuffleServiceSuite.scala b/core/src/test/scala/org/apache/spark/SecureExternalShuffleServiceSuite.scala
new file mode 100644
index 000000000000..f6f3e64a4c14
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SecureExternalShuffleServiceSuite.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.network.TransportContext
+import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
+
+/**
+ * This suite creates an external shuffle server and routes all shuffle fetches through it.
+ * Note that failures in this suite may arise due to changes in Spark that invalidate expectations
+ * set up in [[ExternalShuffleBlockHandler]], such as changing the format of shuffle files or how
+ * we hash files into folders.
+ */
+class SecureExternalShuffleServiceSuite extends ExternalShuffleServiceSuite {
+
+ override def beforeAll() {
+ SSLSampleConfigs.setSparkSSLConfig(conf)
+ val transportConf = SparkTransportConf.fromSparkConf(
+ conf, "shuffle", numUsableCores = 2, Some(SSLSampleConfigs.sparkSSLOptions()))
+ rpcHandler = new ExternalShuffleBlockHandler(transportConf, null)
+ val transportContext = new TransportContext(transportConf, rpcHandler)
+ server = transportContext.createServer()
+ conf.set("spark.shuffle.manager", "sort")
+ conf.set("spark.shuffle.service.enabled", "true")
+ conf.set("spark.shuffle.service.port", server.getPort.toString)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
index e0226803bb1c..f280f222e03d 100644
--- a/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SecurityManagerSuite.scala
@@ -207,6 +207,16 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(securityManager.akkaSSLOptions.keyPassword === Some("password"))
assert(securityManager.akkaSSLOptions.protocol === Some("TLSv1.2"))
assert(securityManager.akkaSSLOptions.enabledAlgorithms === expectedAlgorithms)
+
+ assert(securityManager.btsSSLOptions.trustStore.isDefined === true)
+ assert(securityManager.btsSSLOptions.trustStore.get.getName === "truststore")
+ assert(securityManager.btsSSLOptions.keyStore.isDefined === true)
+ assert(securityManager.btsSSLOptions.keyStore.get.getName === "keystore")
+ assert(securityManager.btsSSLOptions.trustStorePassword === Some("password"))
+ assert(securityManager.btsSSLOptions.keyStorePassword === Some("password"))
+ assert(securityManager.btsSSLOptions.keyPassword === Some("password"))
+ assert(securityManager.btsSSLOptions.protocol === Some("TLSv1.2"))
+ assert(securityManager.btsSSLOptions.enabledAlgorithms === expectedAlgorithms)
}
test("ssl off setup") {
@@ -219,6 +229,7 @@ class SecurityManagerSuite extends SparkFunSuite with ResetSystemProperties {
assert(securityManager.fileServerSSLOptions.enabled === false)
assert(securityManager.akkaSSLOptions.enabled === false)
+ assert(securityManager.btsSSLOptions.enabled === false)
assert(securityManager.sslSocketFactory.isDefined === false)
assert(securityManager.hostnameVerifier.isDefined === false)
}
diff --git a/core/src/test/scala/org/apache/spark/SslShuffleNettySuite.scala b/core/src/test/scala/org/apache/spark/SslShuffleNettySuite.scala
new file mode 100644
index 000000000000..20b7c2298b61
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/SslShuffleNettySuite.scala
@@ -0,0 +1,12 @@
+package org.apache.spark
+
+/**
+ *
+ */
+class SslShuffleNettySuite extends ShuffleNettySuite {
+
+ override def beforeAll() {
+ conf.set("spark.shuffle.blockTransferService", "netty")
+ SSLSampleConfigs.setSparkSSLShuffleConfig(conf)
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
index 98da94139f7f..15dee1bd114c 100644
--- a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
+++ b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferSecuritySuite.scala
@@ -32,6 +32,8 @@ import org.apache.spark.network.shuffle.BlockFetchingListener
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
import org.apache.spark.storage.{BlockId, ShuffleBlockId}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
+import org.apache.spark.SSLSampleConfigs._
+
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
import org.scalatest.ShouldMatchers
@@ -46,6 +48,17 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
}
}
+ test("ssl with cloned config") {
+ val conf = sparkSSLConfig()
+ conf.set("spark.app.id", "app-id")
+ val conf1 = conf.clone
+
+ testConnection(conf, conf1) match {
+ case Success(_) => // expected
+ case Failure(t) => fail(t)
+ }
+ }
+
test("security on same password") {
val conf = new SparkConf()
.set("spark.authenticate", "true")
@@ -57,6 +70,17 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
}
}
+ test("security on same password over ssl") {
+ val conf = sparkSSLConfig()
+ .set("spark.authenticate", "true")
+ .set("spark.authenticate.secret", "good")
+ .set("spark.app.id", "app-id")
+ testConnection(conf, conf) match {
+ case Success(_) => // expected
+ case Failure(t) => fail(t)
+ }
+ }
+
test("security on mismatch password") {
val conf0 = new SparkConf()
.set("spark.authenticate", "true")
@@ -69,6 +93,18 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
}
}
+ test("security on mismatch password over ssl") {
+ val conf0 = sparkSSLConfig()
+ .set("spark.authenticate", "true")
+ .set("spark.authenticate.secret", "good")
+ .set("spark.app.id", "app-id")
+ val conf1 = conf0.clone.set("spark.authenticate.secret", "bad")
+ testConnection(conf0, conf1) match {
+ case Success(_) => fail("Should have failed")
+ case Failure(t) => t.getMessage should include ("Mismatched response")
+ }
+ }
+
test("security mismatch auth off on server") {
val conf0 = new SparkConf()
.set("spark.authenticate", "true")
@@ -81,6 +117,18 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
}
}
+ test("security mismatch auth off on server over ssl") {
+ val conf0 = sparkSSLConfig()
+ .set("spark.authenticate", "true")
+ .set("spark.authenticate.secret", "good")
+ .set("spark.app.id", "app-id")
+ val conf1 = conf0.clone.set("spark.authenticate", "false")
+ testConnection(conf0, conf1) match {
+ case Success(_) => fail("Should have failed")
+ case Failure(t) => // any funny error may occur, sever will interpret SASL token as RPC
+ }
+ }
+
test("security mismatch auth off on client") {
val conf0 = new SparkConf()
.set("spark.authenticate", "false")
@@ -93,6 +141,18 @@ class NettyBlockTransferSecuritySuite extends SparkFunSuite with MockitoSugar wi
}
}
+ test("security mismatch auth off on client over ssl") {
+ val conf0 = sparkSSLConfig()
+ .set("spark.authenticate", "false")
+ .set("spark.authenticate.secret", "good")
+ .set("spark.app.id", "app-id")
+ val conf1 = conf0.clone.set("spark.authenticate", "true")
+ testConnection(conf0, conf1) match {
+ case Success(_) => fail("Should have failed")
+ case Failure(t) => t.getMessage should include ("Expected SaslMessage")
+ }
+ }
+
/**
* Creates two servers with different configurations and sees if they can talk.
* Returns Success() if they can transfer a block, and Failure() if the block transfer was failed
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSecuritySuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSecuritySuite.scala
new file mode 100644
index 000000000000..3afe64ffccb4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSecuritySuite.scala
@@ -0,0 +1,18 @@
+package org.apache.spark.storage
+
+import org.apache.spark.SSLSampleConfigs._
+import org.apache.spark.SparkConf
+
+/**
+ *
+ */
+class BlockManagerReplicationSecuritySuite extends BlockManagerReplicationSuite {
+
+ /**
+ * Create a [[SparkConf]] with the appropriate SSL settings...
+ * @return
+ */
+ override private[storage] def createConf(): SparkConf = {
+ sparkSSLConfig().set("spark.app.id", "test")
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 6e3f500e15dc..2b455f825285 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -39,7 +39,7 @@ import org.apache.spark.storage.StorageLevel._
/** Testsuite that tests block replication in BlockManager */
class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
- private val conf = new SparkConf(false).set("spark.app.id", "test")
+ private var conf = createConf()
private var rpcEnv: RpcEnv = null
private var master: BlockManagerMaster = null
private val securityMgr = new SecurityManager(conf)
@@ -57,6 +57,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// Implicitly convert strings to BlockIds for test clarity.
private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+ private[storage] def createConf(): SparkConf = new SparkConf(false).set("spark.app.id", "test")
+
private def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
diff --git a/network/common/pom.xml b/network/common/pom.xml
index 32c34c63a45c..831f3d03207c 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -83,6 +83,12 @@
slf4j-log4j12
test
+
+ org.bouncycastle
+ bcprov-jdk16
+ 1.46
+ test
+
diff --git a/network/common/src/main/java/org/apache/spark/network/TransportContext.java b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
index 238710d17249..29dc0f475cff 100644
--- a/network/common/src/main/java/org/apache/spark/network/TransportContext.java
+++ b/network/common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -20,12 +20,21 @@
import java.util.List;
import com.google.common.collect.Lists;
+
import io.netty.channel.Channel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.handler.codec.MessageToMessageEncoder;
+
+import org.apache.spark.network.util.ssl.NettySslEncryptionHandler;
+import org.apache.spark.network.util.ssl.NoSslEncryptionHandler;
+import org.apache.spark.network.util.ssl.SSLFactory;
+import org.apache.spark.network.util.ssl.SslEncryptionHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.spark.network.protocol.Message;
+import org.apache.spark.network.protocol.SslMessageEncoder;
import org.apache.spark.network.client.TransportClient;
import org.apache.spark.network.client.TransportClientBootstrap;
import org.apache.spark.network.client.TransportClientFactory;
@@ -60,8 +69,9 @@ public class TransportContext {
private final TransportConf conf;
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
+ private final SslEncryptionHandler sslEncryptionHandler;
- private final MessageEncoder encoder;
+ private final MessageToMessageEncoder encoder;
private final MessageDecoder decoder;
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
@@ -74,8 +84,10 @@ public TransportContext(
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
- this.encoder = new MessageEncoder();
+ this.sslEncryptionHandler = createSslEncryptionHandler();
this.decoder = new MessageDecoder();
+ this.encoder =
+ (this.sslEncryptionHandler.isEnabled() ? new SslMessageEncoder() : new MessageEncoder());
this.closeIdleConnections = closeIdleConnections;
}
@@ -116,6 +128,14 @@ public TransportChannelHandler initializePipeline(SocketChannel channel) {
return initializePipeline(channel, rpcHandler);
}
+ /**
+ * Returns the configured {@link SslEncryptionHandler}
+ * @return
+ */
+ public SslEncryptionHandler getSslEncryptionHandler() {
+ return sslEncryptionHandler;
+ }
+
/**
* Initializes a client or server Netty Channel Pipeline which encodes/decodes messages and
* has a {@link org.apache.spark.network.server.TransportChannelHandler} to handle request or
@@ -148,6 +168,31 @@ public TransportChannelHandler initializePipeline(
}
}
+ /**
+ *
+ * @return
+ */
+ private SslEncryptionHandler createSslEncryptionHandler() {
+ if (conf.sslShuffleEnabled()) {
+ return new NettySslEncryptionHandler(
+ new SSLFactory.Builder()
+ .requestedProtocol(conf.sslShuffleProtocol())
+ .requestedCiphers(conf.sslShuffleRequestedCiphers())
+ .keyStore(conf.sslShuffleKeyStore(), conf.sslShuffleKeyStorePassword())
+ .privateKey(conf.sslShufflePrivateKey())
+ .keyPassword(conf.sslShuffleKeyPassword())
+ .certChain(conf.sslShuffleCertChain())
+ .trustStore(
+ conf.sslShuffleTrustStore(),
+ conf.sslShuffleTrustStorePassword(),
+ conf.sslShuffleTrustStoreReloadingEnabled(),
+ conf.sslShuffleTrustStoreReloadInterval())
+ .build());
+ } else {
+ return new NoSslEncryptionHandler();
+ }
+ }
+
/**
* Creates the server- and client-side handler which is used to handle both RequestMessages and
* ResponseMessages. The channel is expected to have been successfully created, though certain
diff --git a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
index 844eff4f4c70..8c014f43ed76 100644
--- a/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
+++ b/network/common/src/main/java/org/apache/spark/network/buffer/FileSegmentManagedBuffer.java
@@ -29,6 +29,7 @@
import com.google.common.io.ByteStreams;
import io.netty.channel.DefaultFileRegion;
+import io.netty.handler.stream.ChunkedFile;
import org.apache.spark.network.util.JavaUtils;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.TransportConf;
@@ -129,7 +130,10 @@ public ManagedBuffer release() {
@Override
public Object convertToNetty() throws IOException {
- if (conf.lazyFileDescriptor()) {
+ if (conf.sslShuffleEnabled()) {
+ //Cannot use zero-copy with HTTPS
+ return new ChunkedFile(new RandomAccessFile(file, "r"), offset, length, conf.sslShuffleChunkSize());
+ } else if (conf.lazyFileDescriptor()) {
return new LazyFileRegion(file, offset, length);
} else {
FileChannel fileChannel = new FileInputStream(file).getChannel();
diff --git a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
index 61bafc838004..a28193eb9e73 100644
--- a/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
+++ b/network/common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java
@@ -40,6 +40,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.spark.network.util.ssl.SslEncryptionHandler;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.server.TransportChannelHandler;
import org.apache.spark.network.util.IOMode;
@@ -80,6 +81,8 @@ public ClientPool(int size) {
private final List clientBootstraps;
private final ConcurrentHashMap connectionPool;
+ private SslEncryptionHandler sslEncryptionHandler;
+
/** Random number generator for picking connections between peers. */
private final Random rand;
private final int numConnectionsPerPeer;
@@ -89,14 +92,15 @@ public ClientPool(int size) {
private PooledByteBufAllocator pooledAllocator;
public TransportClientFactory(
- TransportContext context,
- List clientBootstraps) {
+ TransportContext context,
+ List clientBootstraps) {
this.context = Preconditions.checkNotNull(context);
this.conf = context.getConf();
this.clientBootstraps = Lists.newArrayList(Preconditions.checkNotNull(clientBootstraps));
this.connectionPool = new ConcurrentHashMap();
this.numConnectionsPerPeer = conf.numConnectionsPerPeer();
this.rand = new Random();
+ this.sslEncryptionHandler = context.getSslEncryptionHandler();
IOMode ioMode = IOMode.valueOf(conf.ioMode());
this.socketChannelClass = NettyUtils.getClientChannelClass(ioMode);
@@ -185,29 +189,43 @@ public TransportClient createUnmanagedClient(String remoteHost, int remotePort)
private TransportClient createClient(InetSocketAddress address) throws IOException {
logger.debug("Creating new connection to " + address);
- Bootstrap bootstrap = new Bootstrap();
- bootstrap.group(workerGroup)
- .channel(socketChannelClass)
- // Disable Nagle's Algorithm since we don't want packets to wait
- .option(ChannelOption.TCP_NODELAY, true)
- .option(ChannelOption.SO_KEEPALIVE, true)
- .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
- .option(ChannelOption.ALLOCATOR, pooledAllocator);
-
+ Bootstrap bootstrap = buildBootstrap();
final AtomicReference clientRef = new AtomicReference();
final AtomicReference channelRef = new AtomicReference();
-
- bootstrap.handler(new ChannelInitializer() {
- @Override
- public void initChannel(SocketChannel ch) {
- TransportChannelHandler clientHandler = context.initializePipeline(ch);
- clientRef.set(clientHandler.getClient());
- channelRef.set(ch);
- }
- });
+ initHandler(bootstrap, clientRef, channelRef);
// Connect to the remote server
long preConnect = System.nanoTime();
+ ChannelFuture cf = connect(bootstrap, conf, address);
+ sslEncryptionHandler.onConnect(cf, address, conf.connectionTimeoutMs());
+
+ TransportClient client = clientRef.get();
+ Channel channel = channelRef.get();
+ assert client != null : "Channel future completed successfully with null client";
+
+ // Execute any client bootstraps synchronously before marking the Client as successful.
+ long preBootstrap = System.nanoTime();
+ logger.debug("Connection to {} successful, running bootstraps...", address);
+ doBootstraps(client, channel, preBootstrap);
+ long postBootstrap = System.nanoTime();
+
+ logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
+ address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
+
+ return client;
+ }
+
+ /**
+ *
+ * @param bootstrap
+ * @param conf
+ * @param address
+ * @throws IOException
+ */
+ private ChannelFuture connect(
+ Bootstrap bootstrap,
+ TransportConf conf,
+ InetSocketAddress address) throws IOException {
ChannelFuture cf = bootstrap.connect(address);
if (!cf.awaitUninterruptibly(conf.connectionTimeoutMs())) {
throw new IOException(
@@ -215,14 +233,33 @@ public void initChannel(SocketChannel ch) {
} else if (cf.cause() != null) {
throw new IOException(String.format("Failed to connect to %s", address), cf.cause());
}
+ return cf;
+ }
- TransportClient client = clientRef.get();
- Channel channel = channelRef.get();
- assert client != null : "Channel future completed successfully with null client";
+ /**
+ *
+ * @return
+ */
+ private Bootstrap buildBootstrap() {
+ return new Bootstrap()
+ .group(workerGroup)
+ .channel(socketChannelClass)
+ .option(ChannelOption.TCP_NODELAY, true) // Disable Nagle's Algorithm since we don't want packets to wait
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, conf.connectionTimeoutMs())
+ .option(ChannelOption.ALLOCATOR, pooledAllocator);
+ }
- // Execute any client bootstraps synchronously before marking the Client as successful.
- long preBootstrap = System.nanoTime();
- logger.debug("Connection to {} successful, running bootstraps...", address);
+ /**
+ * Execute all of our {@link TransportClientBootstrap} bootstraps.
+ * @param client
+ * @param channel
+ * @param preBootstrap
+ */
+ private void doBootstraps(
+ TransportClient client,
+ Channel channel,
+ long preBootstrap) {
try {
for (TransportClientBootstrap clientBootstrap : clientBootstraps) {
clientBootstrap.doBootstrap(client, channel);
@@ -233,12 +270,25 @@ public void initChannel(SocketChannel ch) {
client.close();
throw Throwables.propagate(e);
}
- long postBootstrap = System.nanoTime();
-
- logger.debug("Successfully created connection to {} after {} ms ({} ms spent in bootstraps)",
- address, (postBootstrap - preConnect) / 1000000, (postBootstrap - preBootstrap) / 1000000);
+ }
- return client;
+ /**
+ * @param bootstrap
+ * @param clientRef
+ */
+ private void initHandler(
+ final Bootstrap bootstrap,
+ final AtomicReference clientRef,
+ final AtomicReference channelRef) {
+ bootstrap.handler(new ChannelInitializer() {
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ TransportChannelHandler clientHandler = context.initializePipeline(ch);
+ sslEncryptionHandler.addToPipeline(ch.pipeline(), true);
+ clientRef.set(clientHandler.getClient());
+ channelRef.set(ch);
+ }
+ });
}
/** Close all connections in the connection pool, and shutdown the worker thread pool. */
@@ -260,5 +310,10 @@ public void close() {
workerGroup.shutdownGracefully();
workerGroup = null;
}
+
+ if (sslEncryptionHandler != null) {
+ sslEncryptionHandler.close();
+ sslEncryptionHandler = null;
+ }
}
}
diff --git a/network/common/src/main/java/org/apache/spark/network/protocol/SslMessageEncoder.java b/network/common/src/main/java/org/apache/spark/network/protocol/SslMessageEncoder.java
new file mode 100644
index 000000000000..63c52f38dfe8
--- /dev/null
+++ b/network/common/src/main/java/org/apache/spark/network/protocol/SslMessageEncoder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.network.protocol;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.stream.ChunkedFile;
+
+import org.apache.spark.network.util.ChunkedFileWithHeader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Encoder used by the server side to encode secure (SSL) server-to-client responses.
+ * This encoder is stateless so it is safe to be shared by multiple threads.
+ */
+@ChannelHandler.Sharable
+public final class SslMessageEncoder extends MessageToMessageEncoder {
+
+ private final Logger logger = LoggerFactory.getLogger(SslMessageEncoder.class);
+
+ /**
+ * Encodes a Message by invoking its encode() method. For non-data messages, we will add one
+ * ByteBuf to 'out' containing the total frame length, the message type, and the message itself.
+ * In the case of a ChunkFetchSuccess, we will also add the ManagedBuffer corresponding to the
+ * data to 'out'.
+ * @param ctx
+ * @param in
+ * @param out
+ */
+ @Override
+ public void encode(ChannelHandlerContext ctx, Message in, List