Skip to content

Commit 915bc52

Browse files
committed
Added support for accessing secured HDFS
1 parent 0d1cc4a commit 915bc52

File tree

6 files changed

+244
-7
lines changed

6 files changed

+244
-7
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,11 @@ class SparkContext(config: SparkConf) extends Logging {
227227
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
228228
val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
229229

230+
// Need to do security authentication when Hadoop security is turned on
231+
if (SparkHadoopUtil.get.isSecurityEnabled()) {
232+
SparkHadoopUtil.get.doUserAuthentication(this)
233+
}
234+
230235
// Optionally log Spark events
231236
private[spark] val eventLogger: Option[EventLoggingListener] = {
232237
if (conf.getBoolean("spark.eventLog.enabled", false)) {

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 173 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,19 @@
1818
package org.apache.spark.deploy
1919

2020
import java.security.PrivilegedExceptionAction
21+
import java.util.{Collection, TimerTask, Timer}
22+
import java.io.{File, IOException}
23+
import java.net.URI
2124

25+
import org.apache.hadoop.fs.{FileSystem, Path}
26+
import org.apache.hadoop.security.token.{TokenIdentifier, Token}
27+
import org.apache.hadoop.fs.permission.FsPermission
2228
import org.apache.hadoop.conf.Configuration
2329
import org.apache.hadoop.mapred.JobConf
2430
import org.apache.hadoop.security.Credentials
2531
import org.apache.hadoop.security.UserGroupInformation
2632

27-
import org.apache.spark.{Logging, SparkContext, SparkConf, SparkException}
33+
import org.apache.spark._
2834
import org.apache.spark.annotation.DeveloperApi
2935

3036
import scala.collection.JavaConversions._
@@ -38,6 +44,8 @@ class SparkHadoopUtil extends Logging {
3844
val conf: Configuration = newConfiguration(new SparkConf())
3945
UserGroupInformation.setConfiguration(conf)
4046

47+
val sparkConf = new SparkConf()
48+
4149
/**
4250
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
4351
* (distributed to child threads), used for authenticating HDFS and YARN calls.
@@ -117,6 +125,170 @@ class SparkHadoopUtil extends Logging {
117125

118126
def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }
119127

128+
/**
129+
* Return whether Hadoop security is enabled or not.
130+
*
131+
* @return Whether Hadoop security is enabled or not
132+
*/
133+
def isSecurityEnabled(): Boolean = {
134+
UserGroupInformation.isSecurityEnabled
135+
}
136+
137+
/**
138+
* Do user authentication when Hadoop security is turned on. Used by the driver.
139+
*
140+
* @param sc Spark context
141+
*/
142+
def doUserAuthentication(sc: SparkContext) {
143+
getAuthenticationType match {
144+
case "keytab" => {
145+
// Authentication through a Kerberos keytab file. Necessary for
146+
// long-running services like Shark/Spark Streaming.
147+
scheduleKerberosRenewTask(sc)
148+
}
149+
case _ => {
150+
// No authentication needed. Assuming authentication is already done
151+
// before Spark is launched, e.g., the user has authenticated with
152+
// Kerberos through kinit already.
153+
// Renew a Hadoop delegation token and store the token into a file.
154+
// Add the token file so it gets downloaded by every slave nodes.
155+
sc.addFile(initDelegationToken().toString)
156+
}
157+
}
158+
}
159+
160+
/**
161+
* Get the user whom the task belongs to.
162+
*
163+
* @param userName Name of the user whom the task belongs to
164+
* @return The user whom the task belongs to
165+
*/
166+
def getTaskUser(userName: String): UserGroupInformation = {
167+
val ugi = UserGroupInformation.createRemoteUser(userName)
168+
// Change the authentication method to Kerberos
169+
ugi.setAuthenticationMethod(
170+
UserGroupInformation.AuthenticationMethod.KERBEROS)
171+
// Get and add Hadoop delegation tokens for the user
172+
val iter = getDelegationTokens().iterator()
173+
while (iter.hasNext) {
174+
ugi.addToken(iter.next())
175+
}
176+
177+
ugi
178+
}
179+
180+
/**
181+
* Get the type of Hadoop security authentication.
182+
*
183+
* @return Type of Hadoop security authentication
184+
*/
185+
private def getAuthenticationType: String = {
186+
sparkConf.get("spark.hadoop.security.authentication")
187+
188+
}
189+
190+
/**
191+
* Schedule a timer task for automatically renewing Kerberos credential.
192+
*
193+
* @param sc @param sc Spark context
194+
*/
195+
private def scheduleKerberosRenewTask(sc: SparkContext): Unit = {
196+
val kerberosRenewTimer = new Timer()
197+
val kerberosRenewTimerTask = new TimerTask {
198+
def run(): Unit = {
199+
try {
200+
kerberosLoginFromKeytab
201+
// Renew a Hadoop delegation token and store the token into a file.
202+
// Add the token file so it gets downloaded by every slave nodes.
203+
sc.addFile(initDelegationToken().toString)
204+
} catch {
205+
case ioe: IOException => {
206+
logError("Failed to login from Kerberos keytab", ioe)
207+
}
208+
}
209+
}
210+
}
211+
212+
val interval = sparkConf.getLong(
213+
"spark.hadoop.security.kerberos.renewInterval", 21600000)
214+
kerberosRenewTimer.schedule(kerberosRenewTimerTask, 0, interval)
215+
logInfo("Scheduled timer task for renewing Kerberos credential")
216+
}
217+
218+
/**
219+
* Log a user in from a keytab file. Loads user credential from a keytab
220+
* file and logs the user in.
221+
*/
222+
private def kerberosLoginFromKeytab(): Unit = {
223+
val user = System.getProperty("user.name")
224+
val home = System.getProperty("user.home")
225+
val defaultKeytab = home + Path.SEPARATOR + user + ".keytab"
226+
val keytab = sparkConf.get(
227+
"spark.hadoop.security.kerberos.keytab", defaultKeytab)
228+
.replaceAll("_USER", user).replaceAll("_HOME", home)
229+
val principal = sparkConf.get(
230+
"spark.hadoop.security.kerberos.principal", user).replaceAll("_USER", user)
231+
.replaceAll("_HOME", home)
232+
233+
// Keytab file not found
234+
if (!new File(keytab).exists()) {
235+
throw new IOException("Keytab file %s not found".format(keytab))
236+
}
237+
238+
loginUserFromKeytab(principal, keytab)
239+
}
240+
241+
/**
242+
* Initialize a Hadoop delegation token, store the token into a file,
243+
* and add it to the SparkContext so executors can get it.
244+
*
245+
* @return URI of the token file
246+
*/
247+
private def initDelegationToken(): URI = {
248+
val localFS = FileSystem.getLocal(conf)
249+
// Store the token file under user's home directory
250+
val tokenFile = new Path(localFS.getHomeDirectory, sparkConf.get(
251+
"spark.hadoop.security.token.name", "spark.token"))
252+
if (localFS.exists(tokenFile)) {
253+
localFS.delete(tokenFile, false)
254+
}
255+
256+
// Get a new token and write it to the given token file
257+
val currentUser = UserGroupInformation.getCurrentUser
258+
val fs = FileSystem.get(conf)
259+
val token: Token[_ <: TokenIdentifier] =
260+
fs.getDelegationToken(currentUser.getShortUserName)
261+
.asInstanceOf[Token[_ <: TokenIdentifier]]
262+
val cred = new Credentials()
263+
cred.addToken(token.getService, token)
264+
cred.writeTokenStorageFile(tokenFile, conf)
265+
// Make sure the token file is read-only to the owner
266+
localFS.setPermission(tokenFile, FsPermission.createImmutable(0400))
267+
268+
logInfo("Stored Hadoop delegation token for user %s to file %s".format(
269+
currentUser.getShortUserName, tokenFile.toUri.toString))
270+
tokenFile.toUri
271+
}
272+
273+
/**
274+
* Get delegation tokens from the token file added through SparkContext.addFile().
275+
*
276+
* @return Collection of delegation tokens
277+
*/
278+
private def getDelegationTokens(): Collection[Token[_ <: TokenIdentifier]] = {
279+
// Get the token file added through SparkContext.addFile()
280+
val source = new File(SparkFiles.get(sparkConf.get(
281+
"spark.hadoop.security.token.name", "spark.token")))
282+
if (source.exists()) {
283+
val sourcePath = new Path("file://" + source.getAbsolutePath)
284+
// Read credentials from the token file
285+
Credentials.readTokenStorageFile(sourcePath, conf).getAllTokens
286+
} else {
287+
throw new IOException(
288+
"Token file %s does not exist".format(source.getAbsolutePath))
289+
}
290+
}
291+
120292
def loginUserFromKeytab(principalName: String, keytabFilename: String) {
121293
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
122294
}

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io.File
2121
import java.lang.management.ManagementFactory
2222
import java.nio.ByteBuffer
2323
import java.util.concurrent._
24+
import java.security.PrivilegedExceptionAction
2425

2526
import scala.collection.JavaConversions._
2627
import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -158,7 +159,8 @@ private[spark] class Executor(
158159
try {
159160
SparkEnv.set(env)
160161
Accumulators.clear()
161-
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
162+
val (userName, taskFiles, taskJars, taskBytes) =
163+
Task.deserializeWithDependencies(serializedTask)
162164
updateDependencies(taskFiles, taskJars)
163165
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
164166

@@ -178,7 +180,19 @@ private[spark] class Executor(
178180

179181
// Run the actual task and measure its runtime.
180182
taskStart = System.currentTimeMillis()
181-
val value = task.run(taskId.toInt)
183+
var value: Any = None
184+
if (SparkHadoopUtil.get.isSecurityEnabled()) {
185+
// Get the user whom the task belongs to
186+
val ugi = SparkHadoopUtil.get.getTaskUser(userName)
187+
// Run the task as the user whom the task belongs to
188+
ugi.doAs(new PrivilegedExceptionAction[Unit] {
189+
def run(): Unit = {
190+
value = task.run(taskId.toInt)
191+
}
192+
})
193+
} else {
194+
value = task.run(taskId.toInt)
195+
}
182196
val taskFinish = System.currentTimeMillis()
183197

184198
// If the task has been killed, let's fail it.

core/src/main/scala/org/apache/spark/scheduler/Task.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ private[spark] object Task {
107107
* Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
108108
*/
109109
def serializeWithDependencies(
110+
userName: String,
110111
task: Task[_],
111112
currentFiles: HashMap[String, Long],
112113
currentJars: HashMap[String, Long],
@@ -116,6 +117,9 @@ private[spark] object Task {
116117
val out = new ByteArrayOutputStream(4096)
117118
val dataOut = new DataOutputStream(out)
118119

120+
// Write the name of the user launching the task
121+
dataOut.writeUTF(userName)
122+
119123
// Write currentFiles
120124
dataOut.writeInt(currentFiles.size)
121125
for ((name, timestamp) <- currentFiles) {
@@ -142,14 +146,17 @@ private[spark] object Task {
142146
* and return the task itself as a serialized ByteBuffer. The caller can then update its
143147
* ClassLoaders and deserialize the task.
144148
*
145-
* @return (taskFiles, taskJars, taskBytes)
149+
* @return (userName, taskFiles, taskJars, taskBytes)
146150
*/
147151
def deserializeWithDependencies(serializedTask: ByteBuffer)
148-
: (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {
152+
: (String, HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {
149153

150154
val in = new ByteBufferInputStream(serializedTask)
151155
val dataIn = new DataInputStream(in)
152156

157+
// Read the name of the user launching the task
158+
val userName = dataIn.readUTF()
159+
153160
// Read task's files
154161
val taskFiles = new HashMap[String, Long]()
155162
val numFiles = dataIn.readInt()
@@ -166,6 +173,6 @@ private[spark] object Task {
166173

167174
// Create a sub-buffer for the rest of the data, which is the serialized Task object
168175
val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task
169-
(taskFiles, taskJars, subBuffer)
176+
(userName, taskFiles, taskJars, subBuffer)
170177
}
171178
}

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -433,10 +433,11 @@ private[spark] class TaskSetManager(
433433
}
434434
// Serialize and return the task
435435
val startTime = clock.getTime()
436+
val userName = System.getProperty("user.name")
436437
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
437438
// we assume the task can be serialized without exceptions.
438439
val serializedTask = Task.serializeWithDependencies(
439-
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
440+
userName, task, sched.sc.addedFiles, sched.sc.addedJars, ser)
440441
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
441442
!emittedTaskSizeWarning) {
442443
emittedTaskSizeWarning = true

docs/configuration.md

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,44 @@ Apart from these, the following properties are also available, and may be useful
722722
Number of cores to allocate for each task.
723723
</td>
724724
</tr>
725+
<tr>
726+
<td>spark.hadoop.security.authentication</td>
727+
<td>(none)</td>
728+
<td>
729+
Method used for authenticating user when Hadoop security is turned on. A Hadoop delegation token can be
730+
obtained only after the user is authenticated.
731+
</td>
732+
</tr>
733+
<tr>
734+
<td>spark.hadoop.security.kerberos.renewInterval</td>
735+
<td>21600000</td>
736+
<td>
737+
Interval for automatically renewing the Kerberos credential when Hadoop security is turned on and Kerberos
738+
is the method for user authentication.
739+
</td>
740+
</tr>
741+
<tr>
742+
<td>spark.hadoop.security.kerberos.keytab</td>
743+
<td>{Current login user name}.keytab under the home directory of the current login user</td>
744+
<td>
745+
Local path of the Kerberos keytab file. The keytab usually is located on the gateway host to the Spark
746+
cluster. "_USER" and "_HOME" variables available.
747+
</td>
748+
</tr>
749+
<tr>
750+
<td>spark.hadoop.security.kerberos.principal</td>
751+
<td>Current login user name</td>
752+
<td>
753+
Principal used for Kerberos login. "_USER" and "_HOME" variables available.
754+
</td>
755+
</tr>
756+
<tr>
757+
<td>spark.hadoop.security.token.name</td>
758+
<td>spark.token</td>
759+
<td>
760+
Name of the file storing the Hadoop delegation token obtained by the driver.
761+
</td>
762+
</tr>
725763
<tr>
726764
<td><code>spark.task.maxFailures</code></td>
727765
<td>4</td>

0 commit comments

Comments
 (0)