Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e8256e5
support https in spark web ui
scwf Aug 16, 2014
8f7cc96
add unit test
scwf Aug 16, 2014
c90d84e
fix according to comments
scwf Sep 21, 2014
de8d1bd
fix scalastyle
scwf Sep 21, 2014
35074fd
fix workerinfo in JsonProtocolSuite
scwf Sep 21, 2014
967950f
Merge branch 'master' of github.com:apache/spark into https
scwf Sep 21, 2014
9591c9c
import org.eclipse.jetty.server.Server to fix test error
scwf Sep 21, 2014
333334a
fix comments
scwf Sep 25, 2014
64d7dc0
add redirect from http to https
scwf Oct 1, 2014
89bf986
revert debug code
scwf Oct 1, 2014
8e5132d
merge with apache/master
scwf Oct 1, 2014
677b746
add https/ssl to docs
scwf Oct 1, 2014
a4ce923
fix docs
scwf Oct 1, 2014
6c31dc7
fix code format
scwf Oct 1, 2014
7a898fb
fix securePort
scwf Oct 1, 2014
a29ec86
fix conflict with apache/master
scwf Oct 1, 2014
55bbc5f
merge with apache/master and fix conflict
scwf Oct 3, 2014
e5c87cb
fix comments by JoshRosen
scwf Oct 3, 2014
baaa1ce
fix conflict
scwf Oct 4, 2014
a48c6fc
address JoshRosen's comments
scwf Oct 5, 2014
2dadb2f
address vanzin's comments
scwf Oct 7, 2014
8b32853
Merge branch 'master' of https://github.com/apache/spark into https
scwf Oct 7, 2014
3b01d3a
add reference to method newURI
scwf Oct 10, 2014
4ae834b
Merge branch 'master' of https://github.com/apache/spark into https
scwf Oct 10, 2014
d80f7e9
rebase based on https://github.com/apache/spark/pull/1980
WangTaoTheTonic Apr 23, 2015
9deebf3
rewrite using SSLOptions
WangTaoTheTonic Apr 25, 2015
7def14e
fix uisuites
WangTaoTheTonic Apr 25, 2015
18982b4
use spark.ssl.ui.* instead, update docs, fixes in Suite and other min…
WangTaoTheTonic May 4, 2015
dfbe1d6
per Marcelo's comments
WangTaoTheTonic May 5, 2015
872fee3
Merge branch 'master' into SPARK-2750
Dec 9, 2015
6e77187
Allow different UI SSL configs per process.
Dec 9, 2015
a555e43
Warn if store type is defined when creating akka config.
Dec 10, 2015
f6f1dab
Rename a method, and error out on invalid config.
Dec 10, 2015
c69b03b
Fix unit test.
Dec 14, 2015
5f896b3
Merge branch 'master' into SPARK-2750
Dec 30, 2015
620f56e
Merge branch 'master' into SPARK-2750
Jan 4, 2016
6840eee
Built correct URL for worker web UI.
Jan 4, 2016
d19ae29
Merge branch 'master' into SPARK-2750
Jan 6, 2016
66bf7e6
Import order.
Jan 6, 2016
123d958
Merge branch 'master' into SPARK-2750
Jan 6, 2016
330e984
Merge branch 'master' into SPARK-2750
Jan 13, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 45 additions & 5 deletions core/src/main/scala/org/apache/spark/SSLOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,11 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
* @param keyStore a path to the key-store file
* @param keyStorePassword a password to access the key-store file
* @param keyPassword a password to access the private key in the key-store
* @param keyStoreType the type of the key-store
* @param needClientAuth set true if SSL needs client authentication
* @param trustStore a path to the trust-store file
* @param trustStorePassword a password to access the trust-store file
* @param trustStoreType the type of the trust-store
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
* @param enabledAlgorithms a set of encryption algorithms that may be used
*/
Expand All @@ -49,8 +52,11 @@ private[spark] case class SSLOptions(
keyStore: Option[File] = None,
keyStorePassword: Option[String] = None,
keyPassword: Option[String] = None,
keyStoreType: Option[String] = None,
needClientAuth: Boolean = false,
trustStore: Option[File] = None,
trustStorePassword: Option[String] = None,
trustStoreType: Option[String] = None,
protocol: Option[String] = None,
enabledAlgorithms: Set[String] = Set.empty)
extends Logging {
Expand All @@ -63,12 +69,18 @@ private[spark] case class SSLOptions(
val sslContextFactory = new SslContextFactory()

keyStore.foreach(file => sslContextFactory.setKeyStorePath(file.getAbsolutePath))
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
keyStorePassword.foreach(sslContextFactory.setKeyStorePassword)
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
keyStoreType.foreach(sslContextFactory.setKeyStoreType)
if (needClientAuth) {
trustStore.foreach(file => sslContextFactory.setTrustStore(file.getAbsolutePath))
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
trustStoreType.foreach(sslContextFactory.setTrustStoreType)
}
protocol.foreach(sslContextFactory.setProtocol)
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
if (supportedAlgorithms.nonEmpty) {
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
}

Some(sslContextFactory)
} else {
Expand All @@ -82,6 +94,13 @@ private[spark] case class SSLOptions(
*/
def createAkkaConfig: Option[Config] = {
if (enabled) {
if (keyStoreType.isDefined) {
logWarning("Akka configuration does not support key store type.");
}
if (trustStoreType.isDefined) {
logWarning("Akka configuration does not support trust store type.");
}

Some(ConfigFactory.empty()
.withValue("akka.remote.netty.tcp.security.key-store",
ConfigValueFactory.fromAnyRef(keyStore.map(_.getAbsolutePath).getOrElse("")))
Expand Down Expand Up @@ -110,7 +129,9 @@ private[spark] case class SSLOptions(
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
* are supported by the current Java security provider for this protocol.
*/
private val supportedAlgorithms: Set[String] = {
private val supportedAlgorithms: Set[String] = if (enabledAlgorithms.isEmpty) {
Set()
} else {
var context: SSLContext = null
try {
context = SSLContext.getInstance(protocol.orNull)
Expand All @@ -133,7 +154,11 @@ private[spark] case class SSLOptions(
logDebug(s"Discarding unsupported cipher $cipher")
}

enabledAlgorithms & providerAlgorithms
val supported = enabledAlgorithms & providerAlgorithms
require(supported.nonEmpty || sys.env.contains("SPARK_TESTING"),
"SSLContext does not support any of the enabled algorithms: " +
enabledAlgorithms.mkString(","))
supported
}

/** Returns a string representation of this SSLOptions with all the passwords masked. */
Expand All @@ -153,9 +178,12 @@ private[spark] object SSLOptions extends Logging {
* $ - `[ns].keyStore` - a path to the key-store file; can be relative to the current directory
* $ - `[ns].keyStorePassword` - a password to the key-store file
* $ - `[ns].keyPassword` - a password to the private key
* $ - `[ns].keyStoreType` - the type of the key-store
* $ - `[ns].needClientAuth` - whether SSL needs client authentication
* $ - `[ns].trustStore` - a path to the trust-store file; can be relative to the current
* directory
* $ - `[ns].trustStorePassword` - a password to the trust-store file
* $ - `[ns].trustStoreType` - the type of trust-store
* $ - `[ns].protocol` - a protocol name supported by a particular Java version
* $ - `[ns].enabledAlgorithms` - a comma separated list of ciphers
*
Expand Down Expand Up @@ -183,12 +211,21 @@ private[spark] object SSLOptions extends Logging {
val keyPassword = conf.getOption(s"$ns.keyPassword")
.orElse(defaults.flatMap(_.keyPassword))

val keyStoreType = conf.getOption(s"$ns.keyStoreType")
.orElse(defaults.flatMap(_.keyStoreType))

val needClientAuth =
conf.getBoolean(s"$ns.needClientAuth", defaultValue = defaults.exists(_.needClientAuth))

val trustStore = conf.getOption(s"$ns.trustStore").map(new File(_))
.orElse(defaults.flatMap(_.trustStore))

val trustStorePassword = conf.getOption(s"$ns.trustStorePassword")
.orElse(defaults.flatMap(_.trustStorePassword))

val trustStoreType = conf.getOption(s"$ns.trustStoreType")
.orElse(defaults.flatMap(_.trustStoreType))

val protocol = conf.getOption(s"$ns.protocol")
.orElse(defaults.flatMap(_.protocol))

Expand All @@ -202,8 +239,11 @@ private[spark] object SSLOptions extends Logging {
keyStore,
keyStorePassword,
keyPassword,
keyStoreType,
needClientAuth,
trustStore,
trustStorePassword,
trustStoreType,
protocol,
enabledAlgorithms)
}
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -244,14 +244,8 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
// the default SSL configuration - it will be used by all communication layers unless overwritten
private val defaultSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl", defaults = None)

// SSL configuration for different communication layers - they can override the default
// configuration at a specified namespace. The namespace *must* start with spark.ssl.
val fileServerSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.fs", Some(defaultSSLOptions))
val akkaSSLOptions = SSLOptions.parse(sparkConf, "spark.ssl.akka", Some(defaultSSLOptions))

logDebug(s"SSLConfiguration for file server: $fileServerSSLOptions")
logDebug(s"SSLConfiguration for Akka: $akkaSSLOptions")

// SSL configuration for the file server. This is used by Utils.setupSecureURLConnection().
val fileServerSSLOptions = getSSLOptions("fs")
val (sslSocketFactory, hostnameVerifier) = if (fileServerSSLOptions.enabled) {
val trustStoreManagers =
for (trustStore <- fileServerSSLOptions.trustStore) yield {
Expand Down Expand Up @@ -292,6 +286,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf)
(None, None)
}

def getSSLOptions(module: String): SSLOptions = {
val opts = SSLOptions.parse(sparkConf, s"spark.ssl.$module", Some(defaultSSLOptions))
logDebug(s"Created SSL options for $module: $opts")
opts
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ private[deploy] object DeployMessages {
worker: RpcEndpointRef,
cores: Int,
memory: Int,
webUiPort: Int,
publicAddress: String)
workerWebUiUrl: String)
extends DeployMessage {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class HistoryServer(
provider: ApplicationHistoryProvider,
securityManager: SecurityManager,
port: Int)
extends WebUI(securityManager, port, conf) with Logging with UIRoot {
extends WebUI(securityManager, securityManager.getSSLOptions("historyServer"), port, conf)
with Logging with UIRoot {

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
Expand Down Expand Up @@ -233,7 +234,7 @@ object HistoryServer extends Logging {

val UI_PATH_PREFIX = "/history"

def main(argStrings: Array[String]) {
def main(argStrings: Array[String]): Unit = {
Utils.initDaemon(log)
new HistoryServerArguments(conf, argStrings)
initSecurity()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ private[deploy] class Master(

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case RegisterWorker(
id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
Expand All @@ -392,7 +392,7 @@ private[deploy] class Master(
context.reply(RegisterWorkerFailed("Duplicate worker ID"))
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
workerRef, workerUiPort, publicAddress)
workerRef, workerWebUiUrl)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
context.reply(RegisteredWorker(self, masterWebUiUrl))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ private[spark] class WorkerInfo(
val cores: Int,
val memory: Int,
val endpoint: RpcEndpointRef,
val webUiPort: Int,
val publicAddress: String)
val webUiAddress: String)
extends Serializable {

Utils.checkHost(host, "Expected hostname")
Expand Down Expand Up @@ -98,10 +97,6 @@ private[spark] class WorkerInfo(
coresUsed -= driver.desc.cores
}

def webUiAddress : String = {
"http://" + this.publicAddress + ":" + this.webUiPort
}

def setState(state: WorkerState.Value): Unit = {
this.state = state
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class MasterWebUI(
val master: Master,
requestedPort: Int,
customMasterPage: Option[MasterPage] = None)
extends WebUI(master.securityMgr, requestedPort, master.conf, name = "MasterUI") with Logging
with UIRoot {
extends WebUI(master.securityMgr, master.securityMgr.getSSLOptions("standalone"),
requestedPort, master.conf, name = "MasterUI") with Logging with UIRoot {

val masterEndpointRef = master.self
val killEnabled = master.conf.getBoolean("spark.ui.killEnabled", true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark] class MesosClusterUI(
conf: SparkConf,
dispatcherPublicAddress: String,
val scheduler: MesosClusterScheduler)
extends WebUI(securityManager, port, conf) {
extends WebUI(securityManager, securityManager.getSSLOptions("mesos"), port, conf) {

initialize()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ private[deploy] class Worker(
private var master: Option[RpcEndpointRef] = None
private var activeMasterUrl: String = ""
private[worker] var activeMasterWebUiUrl : String = ""
private var workerWebUiUrl: String = ""
private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString
private var registered = false
private var connected = false
Expand Down Expand Up @@ -184,6 +185,9 @@ private[deploy] class Worker(
shuffleService.startIfEnabled()
webUi = new WorkerWebUI(this, workDir, webUiPort)
webUi.bind()

val scheme = if (webUi.sslOptions.enabled) "https" else "http"
workerWebUiUrl = s"$scheme://$publicAddress:${webUi.boundPort}"
registerWithMaster()

metricsSystem.registerSource(workerSource)
Expand Down Expand Up @@ -336,7 +340,7 @@ private[deploy] class Worker(

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, webUi.boundPort, publicAddress))
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class WorkerWebUI(
val worker: Worker,
val workDir: File,
requestedPort: Int)
extends WebUI(worker.securityMgr, requestedPort, worker.conf, name = "WorkerUI")
extends WebUI(worker.securityMgr, worker.securityMgr.getSSLOptions("standalone"),
requestedPort, worker.conf, name = "WorkerUI")
with Logging {

private[ui] val timeout = RpcUtils.askRpcTimeout(worker.conf)
Expand Down
Loading