Skip to content

Commit 7edbea4

Browse files
committed
SPARK-1189: Add Security to Spark - Akka, Http, ConnectionManager, UI use servlets
resubmit pull request. was https://github.com/apache/incubator-spark/pull/332. Author: Thomas Graves <[email protected]> Closes #33 from tgravescs/security-branch-0.9-with-client-rebase and squashes the following commits: dfe3918 [Thomas Graves] Fix merge conflict since startUserClass now using runAsUser 05eebed [Thomas Graves] Fix dependency lost in upmerge d1040ec [Thomas Graves] Fix up various imports 05ff5e0 [Thomas Graves] Fix up imports after upmerging to master ac046b3 [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase 13733e1 [Thomas Graves] Pass securityManager and SparkConf around where we can. Switch to use sparkConf for reading config whereever possible. Added ConnectionManagerSuite unit tests. 4a57acc [Thomas Graves] Change UI createHandler routines to createServlet since they now return servlets 2f77147 [Thomas Graves] Rework from comments 50dd9f2 [Thomas Graves] fix header in SecurityManager ecbfb65 [Thomas Graves] Fix spacing and formatting b514bec [Thomas Graves] Fix reference to config ed3d1c1 [Thomas Graves] Add security.md 6f7ddf3 [Thomas Graves] Convert SaslClient and SaslServer to scala, change spark.authenticate.ui to spark.ui.acls.enable, and fix up various other things from review comments 2d9e23e [Thomas Graves] Merge remote-tracking branch 'upstream/master' into security-branch-0.9-with-client-rebase_rework 5721c5a [Thomas Graves] update AkkaUtilsSuite test for the actorSelection changes, fix typos based on comments, and remove extra lines I missed in rebase from AkkaUtils f351763 [Thomas Graves] Add Security to Spark - Akka, Http, ConnectionManager, UI to use servlets
1 parent 40566e1 commit 7edbea4

File tree

72 files changed

+2251
-292
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

72 files changed

+2251
-292
lines changed

core/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,18 @@
6464
<groupId>org.apache.curator</groupId>
6565
<artifactId>curator-recipes</artifactId>
6666
</dependency>
67+
<dependency>
68+
<groupId>org.eclipse.jetty</groupId>
69+
<artifactId>jetty-plus</artifactId>
70+
</dependency>
71+
<dependency>
72+
<groupId>org.eclipse.jetty</groupId>
73+
<artifactId>jetty-security</artifactId>
74+
</dependency>
75+
<dependency>
76+
<groupId>org.eclipse.jetty</groupId>
77+
<artifactId>jetty-util</artifactId>
78+
</dependency>
6779
<dependency>
6880
<groupId>org.eclipse.jetty</groupId>
6981
<artifactId>jetty-server</artifactId>
@@ -118,6 +130,10 @@
118130
<artifactId>chill-java</artifactId>
119131
<version>0.3.1</version>
120132
</dependency>
133+
<dependency>
134+
<groupId>commons-net</groupId>
135+
<artifactId>commons-net</artifactId>
136+
</dependency>
121137
<dependency>
122138
<groupId>${akka.group}</groupId>
123139
<artifactId>akka-remote_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/HttpFileServer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import com.google.common.io.Files
2323

2424
import org.apache.spark.util.Utils
2525

26-
private[spark] class HttpFileServer extends Logging {
26+
private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {
2727

2828
var baseDir : File = null
2929
var fileDir : File = null
@@ -38,9 +38,10 @@ private[spark] class HttpFileServer extends Logging {
3838
fileDir.mkdir()
3939
jarDir.mkdir()
4040
logInfo("HTTP File server directory is " + baseDir)
41-
httpServer = new HttpServer(baseDir)
41+
httpServer = new HttpServer(baseDir, securityManager)
4242
httpServer.start()
4343
serverUri = httpServer.uri
44+
logDebug("HTTP file server started at: " + serverUri)
4445
}
4546

4647
def stop() {

core/src/main/scala/org/apache/spark/HttpServer.scala

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@ package org.apache.spark
1919

2020
import java.io.File
2121

22+
import org.eclipse.jetty.util.security.{Constraint, Password}
23+
import org.eclipse.jetty.security.authentication.DigestAuthenticator
24+
import org.eclipse.jetty.security.{ConstraintMapping, ConstraintSecurityHandler, HashLoginService, SecurityHandler}
25+
2226
import org.eclipse.jetty.server.Server
2327
import org.eclipse.jetty.server.bio.SocketConnector
24-
import org.eclipse.jetty.server.handler.DefaultHandler
25-
import org.eclipse.jetty.server.handler.HandlerList
26-
import org.eclipse.jetty.server.handler.ResourceHandler
28+
import org.eclipse.jetty.server.handler.{DefaultHandler, HandlerList, ResourceHandler}
2729
import org.eclipse.jetty.util.thread.QueuedThreadPool
2830

2931
import org.apache.spark.util.Utils
3032

33+
3134
/**
3235
* Exception type thrown by HttpServer when it is in the wrong state for an operation.
3336
*/
@@ -38,7 +41,8 @@ private[spark] class ServerStateException(message: String) extends Exception(mes
3841
* as well as classes created by the interpreter when the user types in code. This is just a wrapper
3942
* around a Jetty server.
4043
*/
41-
private[spark] class HttpServer(resourceBase: File) extends Logging {
44+
private[spark] class HttpServer(resourceBase: File, securityManager: SecurityManager)
45+
extends Logging {
4246
private var server: Server = null
4347
private var port: Int = -1
4448

@@ -59,14 +63,60 @@ private[spark] class HttpServer(resourceBase: File) extends Logging {
5963
server.setThreadPool(threadPool)
6064
val resHandler = new ResourceHandler
6165
resHandler.setResourceBase(resourceBase.getAbsolutePath)
66+
6267
val handlerList = new HandlerList
6368
handlerList.setHandlers(Array(resHandler, new DefaultHandler))
64-
server.setHandler(handlerList)
69+
70+
if (securityManager.isAuthenticationEnabled()) {
71+
logDebug("HttpServer is using security")
72+
val sh = setupSecurityHandler(securityManager)
73+
// make sure we go through security handler to get resources
74+
sh.setHandler(handlerList)
75+
server.setHandler(sh)
76+
} else {
77+
logDebug("HttpServer is not using security")
78+
server.setHandler(handlerList)
79+
}
80+
6581
server.start()
6682
port = server.getConnectors()(0).getLocalPort()
6783
}
6884
}
6985

86+
/**
87+
* Setup Jetty to the HashLoginService using a single user with our
88+
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
89+
* isn't passed in plaintext.
90+
*/
91+
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
92+
val constraint = new Constraint()
93+
// use DIGEST-MD5 as the authentication mechanism
94+
constraint.setName(Constraint.__DIGEST_AUTH)
95+
constraint.setRoles(Array("user"))
96+
constraint.setAuthenticate(true)
97+
constraint.setDataConstraint(Constraint.DC_NONE)
98+
99+
val cm = new ConstraintMapping()
100+
cm.setConstraint(constraint)
101+
cm.setPathSpec("/*")
102+
val sh = new ConstraintSecurityHandler()
103+
104+
// the hashLoginService lets us do a single user and
105+
// secret right now. This could be changed to use the
106+
// JAASLoginService for other options.
107+
val hashLogin = new HashLoginService()
108+
109+
val userCred = new Password(securityMgr.getSecretKey())
110+
if (userCred == null) {
111+
throw new Exception("Error: secret key is null with authentication on")
112+
}
113+
hashLogin.putUser(securityMgr.getHttpUser(), userCred, Array("user"))
114+
sh.setLoginService(hashLogin)
115+
sh.setAuthenticator(new DigestAuthenticator());
116+
sh.setConstraintMappings(Array(cm))
117+
sh
118+
}
119+
70120
def stop() {
71121
if (server == null) {
72122
throw new ServerStateException("Server is already stopped")
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.net.{Authenticator, PasswordAuthentication}
21+
import org.apache.hadoop.io.Text
22+
import org.apache.hadoop.security.Credentials
23+
import org.apache.hadoop.security.UserGroupInformation
24+
import org.apache.spark.deploy.SparkHadoopUtil
25+
26+
import scala.collection.mutable.ArrayBuffer
27+
28+
/**
29+
* Spark class responsible for security.
30+
*
31+
* In general this class should be instantiated by the SparkEnv and most components
32+
* should access it from that. There are some cases where the SparkEnv hasn't been
33+
* initialized yet and this class must be instantiated directly.
34+
*
35+
* Spark currently supports authentication via a shared secret.
36+
* Authentication can be configured to be on via the 'spark.authenticate' configuration
37+
* parameter. This parameter controls whether the Spark communication protocols do
38+
* authentication using the shared secret. This authentication is a basic handshake to
39+
* make sure both sides have the same shared secret and are allowed to communicate.
40+
* If the shared secret is not identical they will not be allowed to communicate.
41+
*
42+
* The Spark UI can also be secured by using javax servlet filters. A user may want to
43+
* secure the UI if it has data that other users should not be allowed to see. The javax
44+
* servlet filter specified by the user can authenticate the user and then once the user
45+
* is logged in, Spark can compare that user versus the view acls to make sure they are
46+
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
47+
* control the behavior of the acls. Note that the person who started the application
48+
* always has view access to the UI.
49+
*
50+
* Spark does not currently support encryption after authentication.
51+
*
52+
* At this point spark has multiple communication protocols that need to be secured and
53+
* different underlying mechanisms are used depending on the protocol:
54+
*
55+
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
56+
* Akka remoting allows you to specify a secure cookie that will be exchanged
57+
* and ensured to be identical in the connection handshake between the client
58+
* and the server. If they are not identical then the client will be refused
59+
* to connect to the server. There is no control of the underlying
60+
* authentication mechanism so its not clear if the password is passed in
61+
* plaintext or uses DIGEST-MD5 or some other mechanism.
62+
* Akka also has an option to turn on SSL, this option is not currently supported
63+
* but we could add a configuration option in the future.
64+
*
65+
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
66+
* for the HttpServer. Jetty supports multiple authentication mechanisms -
67+
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
68+
* services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService
69+
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
70+
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
71+
* in plaintext.
72+
* We currently do not support SSL (https), but Jetty can be configured to use it
73+
* so we could add a configuration option for this in the future.
74+
*
75+
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
76+
* Any clients must specify the user and password. There is a default
77+
* Authenticator installed in the SecurityManager to how it does the authentication
78+
* and in this case gets the user name and password from the request.
79+
*
80+
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
81+
* exchange messages. For this we use the Java SASL
82+
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
83+
* as the authentication mechanism. This means the shared secret is not passed
84+
* over the wire in plaintext.
85+
* Note that SASL is pluggable as to what mechanism it uses. We currently use
86+
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
87+
* Spark currently supports "auth" for the quality of protection, which means
88+
* the connection is not supporting integrity or privacy protection (encryption)
89+
* after authentication. SASL also supports "auth-int" and "auth-conf" which
90+
* SPARK could be support in the future to allow the user to specify the quality
91+
* of protection they want. If we support those, the messages will also have to
92+
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
93+
*
94+
* Since the connectionManager does asynchronous messages passing, the SASL
95+
* authentication is a bit more complex. A ConnectionManager can be both a client
96+
* and a Server, so for a particular connection is has to determine what to do.
97+
* A ConnectionId was added to be able to track connections and is used to
98+
* match up incoming messages with connections waiting for authentication.
99+
* If its acting as a client and trying to send a message to another ConnectionManager,
100+
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
101+
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
102+
* and waits for the response from the server and does the handshake.
103+
*
104+
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
105+
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
106+
* properly. For non-Yarn deployments, users can write a filter to go through a
107+
* companies normal login service. If an authentication filter is in place then the
108+
* SparkUI can be configured to check the logged in user against the list of users who
109+
* have view acls to see if that user is authorized.
110+
* The filters can also be used for many different purposes. For instance filters
111+
* could be used for logging, encryption, or compression.
112+
*
113+
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
114+
*
115+
* For Yarn deployments, the secret is automatically generated using the Akka remote
116+
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
117+
* around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels
118+
* of protection. See the Hadoop documentation for more details. Each Spark application on Yarn
119+
* gets a different shared secret. On Yarn, the Spark UI gets configured to use the Hadoop Yarn
120+
* AmIpFilter which requires the user to go through the ResourceManager Proxy. That Proxy is there
121+
* to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use
122+
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
123+
* and Spark will use that to do authorization against the view acls.
124+
*
125+
* For other Spark deployments, the shared secret must be specified via the
126+
* spark.authenticate.secret config.
127+
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
128+
* This again is not ideal as one user could potentially affect another users application.
129+
* This should be enhanced in the future to provide better protection.
130+
* If the UI needs to be secured the user needs to install a javax servlet filter to do the
131+
* authentication. Spark will then use that user to compare against the view acls to do
132+
* authorization. If not filter is in place the user is generally null and no authorization
133+
* can take place.
134+
*/
135+
136+
private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
137+
138+
// key used to store the spark secret in the Hadoop UGI
139+
private val sparkSecretLookupKey = "sparkCookie"
140+
141+
private val authOn = sparkConf.getBoolean("spark.authenticate", false)
142+
private val uiAclsOn = sparkConf.getBoolean("spark.ui.acls.enable", false)
143+
144+
// always add the current user and SPARK_USER to the viewAcls
145+
private val aclUsers = ArrayBuffer[String](System.getProperty("user.name", ""),
146+
Option(System.getenv("SPARK_USER")).getOrElse(""))
147+
aclUsers ++= sparkConf.get("spark.ui.view.acls", "").split(',')
148+
private val viewAcls = aclUsers.map(_.trim()).filter(!_.isEmpty).toSet
149+
150+
private val secretKey = generateSecretKey()
151+
logInfo("SecurityManager, is authentication enabled: " + authOn +
152+
" are ui acls enabled: " + uiAclsOn + " users with view permissions: " + viewAcls.toString())
153+
154+
// Set our own authenticator to properly negotiate user/password for HTTP connections.
155+
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
156+
// only set once.
157+
if (authOn) {
158+
Authenticator.setDefault(
159+
new Authenticator() {
160+
override def getPasswordAuthentication(): PasswordAuthentication = {
161+
var passAuth: PasswordAuthentication = null
162+
val userInfo = getRequestingURL().getUserInfo()
163+
if (userInfo != null) {
164+
val parts = userInfo.split(":", 2)
165+
passAuth = new PasswordAuthentication(parts(0), parts(1).toCharArray())
166+
}
167+
return passAuth
168+
}
169+
}
170+
)
171+
}
172+
173+
/**
174+
* Generates or looks up the secret key.
175+
*
176+
* The way the key is stored depends on the Spark deployment mode. Yarn
177+
* uses the Hadoop UGI.
178+
*
179+
* For non-Yarn deployments, If the config variable is not set
180+
* we throw an exception.
181+
*/
182+
private def generateSecretKey(): String = {
183+
if (!isAuthenticationEnabled) return null
184+
// first check to see if the secret is already set, else generate a new one if on yarn
185+
val sCookie = if (SparkHadoopUtil.get.isYarnMode) {
186+
val secretKey = SparkHadoopUtil.get.getSecretKeyFromUserCredentials(sparkSecretLookupKey)
187+
if (secretKey != null) {
188+
logDebug("in yarn mode, getting secret from credentials")
189+
return new Text(secretKey).toString
190+
} else {
191+
logDebug("getSecretKey: yarn mode, secret key from credentials is null")
192+
}
193+
val cookie = akka.util.Crypt.generateSecureCookie
194+
// if we generated the secret then we must be the first so lets set it so t
195+
// gets used by everyone else
196+
SparkHadoopUtil.get.addSecretKeyToUserCredentials(sparkSecretLookupKey, cookie)
197+
logInfo("adding secret to credentials in yarn mode")
198+
cookie
199+
} else {
200+
// user must have set spark.authenticate.secret config
201+
sparkConf.getOption("spark.authenticate.secret") match {
202+
case Some(value) => value
203+
case None => throw new Exception("Error: a secret key must be specified via the " +
204+
"spark.authenticate.secret config")
205+
}
206+
}
207+
sCookie
208+
}
209+
210+
/**
211+
* Check to see if Acls for the UI are enabled
212+
* @return true if UI authentication is enabled, otherwise false
213+
*/
214+
def uiAclsEnabled(): Boolean = uiAclsOn
215+
216+
/**
217+
* Checks the given user against the view acl list to see if they have
218+
* authorization to view the UI. If the UI acls must are disabled
219+
* via spark.ui.acls.enable, all users have view access.
220+
*
221+
* @param user to see if is authorized
222+
* @return true is the user has permission, otherwise false
223+
*/
224+
def checkUIViewPermissions(user: String): Boolean = {
225+
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
226+
}
227+
228+
/**
229+
* Check to see if authentication for the Spark communication protocols is enabled
230+
* @return true if authentication is enabled, otherwise false
231+
*/
232+
def isAuthenticationEnabled(): Boolean = authOn
233+
234+
/**
235+
* Gets the user used for authenticating HTTP connections.
236+
* For now use a single hardcoded user.
237+
* @return the HTTP user as a String
238+
*/
239+
def getHttpUser(): String = "sparkHttpUser"
240+
241+
/**
242+
* Gets the user used for authenticating SASL connections.
243+
* For now use a single hardcoded user.
244+
* @return the SASL user as a String
245+
*/
246+
def getSaslUser(): String = "sparkSaslUser"
247+
248+
/**
249+
* Gets the secret key.
250+
* @return the secret key as a String if authentication is enabled, otherwise returns null
251+
*/
252+
def getSecretKey(): String = secretKey
253+
}

0 commit comments

Comments
 (0)