Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,6 @@ work
.*\.q
golden
test.out/*
.*iml
.*iml
python/metastore/service.properties
python/metastore/db.lck
4 changes: 2 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializ
class TestMessage(val targetId: String) extends Message[String] with Serializable

class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeouts {

var sc: SparkContext = _

after {
if (sc != null) {
sc.stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
*/
public interface FlatMapFunction<T, R> extends Serializable {
public Iterable<R> call(T t) throws Exception;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,4 @@
*/
public interface FlatMapFunction2<T1, T2, R> extends Serializable {
public Iterable<R> call(T1 t1, T2 t2) throws Exception;
}
}
9 changes: 9 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,12 @@ table.sortable thead {
background-repeat: repeat-x;
filter: progid:dximagetransform.microsoft.gradient(startColorstr='#FFA4EDFF', endColorstr='#FF94DDFF', GradientType=0);
}

span.kill-link {
margin-right: 2px;
color: gray;
}

span.kill-link a {
color: gray;
}
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/HttpFileServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ import com.google.common.io.Files
import org.apache.spark.util.Utils

private[spark] class HttpFileServer(securityManager: SecurityManager) extends Logging {

var baseDir : File = null
var fileDir : File = null
var jarDir : File = null
var httpServer : HttpServer = null
var serverUri : String = null

def initialize() {
baseDir = Utils.createTempDir()
fileDir = new File(baseDir, "files")
Expand All @@ -43,24 +43,24 @@ private[spark] class HttpFileServer(securityManager: SecurityManager) extends Lo
serverUri = httpServer.uri
logDebug("HTTP file server started at: " + serverUri)
}

def stop() {
httpServer.stop()
}

def addFile(file: File) : String = {
addFileToDir(file, fileDir)
serverUri + "/files/" + file.getName
}

def addJar(file: File) : String = {
addFileToDir(file, jarDir)
serverUri + "/jars/" + file.getName
}

def addFileToDir(file: File, dir: File) : String = {
Files.copy(file, new File(dir, file.getName))
dir + "/" + file.getName
}

}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/HttpServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,19 +83,19 @@ private[spark] class HttpServer(resourceBase: File, securityManager: SecurityMan
}
}

/**
/**
* Setup Jetty to the HashLoginService using a single user with our
* shared secret. Configure it to use DIGEST-MD5 authentication so that the password
* isn't passed in plaintext.
*/
private def setupSecurityHandler(securityMgr: SecurityManager): ConstraintSecurityHandler = {
val constraint = new Constraint()
// use DIGEST-MD5 as the authentication mechanism
// use DIGEST-MD5 as the authentication mechanism
constraint.setName(Constraint.__DIGEST_AUTH)
constraint.setRoles(Array("user"))
constraint.setAuthenticate(true)
constraint.setDataConstraint(Constraint.DC_NONE)

val cm = new ConstraintMapping()
cm.setConstraint(constraint)
cm.setPathSpec("/*")
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ trait Partition extends Serializable {
* Get the split's index within its parent RDD
*/
def index: Int

// A better default implementation of HashCode
override def hashCode(): Int = index
}
88 changes: 44 additions & 44 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,93 +25,93 @@ import org.apache.hadoop.io.Text

import org.apache.spark.deploy.SparkHadoopUtil

/**
* Spark class responsible for security.
*
/**
* Spark class responsible for security.
*
* In general this class should be instantiated by the SparkEnv and most components
* should access it from that. There are some cases where the SparkEnv hasn't been
* should access it from that. There are some cases where the SparkEnv hasn't been
* initialized yet and this class must be instantiated directly.
*
*
* Spark currently supports authentication via a shared secret.
* Authentication can be configured to be on via the 'spark.authenticate' configuration
* parameter. This parameter controls whether the Spark communication protocols do
* parameter. This parameter controls whether the Spark communication protocols do
* authentication using the shared secret. This authentication is a basic handshake to
* make sure both sides have the same shared secret and are allowed to communicate.
* If the shared secret is not identical they will not be allowed to communicate.
*
* The Spark UI can also be secured by using javax servlet filters. A user may want to
* secure the UI if it has data that other users should not be allowed to see. The javax
* servlet filter specified by the user can authenticate the user and then once the user
* is logged in, Spark can compare that user versus the view acls to make sure they are
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
* If the shared secret is not identical they will not be allowed to communicate.
*
* The Spark UI can also be secured by using javax servlet filters. A user may want to
* secure the UI if it has data that other users should not be allowed to see. The javax
* servlet filter specified by the user can authenticate the user and then once the user
* is logged in, Spark can compare that user versus the view acls to make sure they are
* authorized to view the UI. The configs 'spark.ui.acls.enable' and 'spark.ui.view.acls'
* control the behavior of the acls. Note that the person who started the application
* always has view access to the UI.
*
* Spark does not currently support encryption after authentication.
*
*
* At this point spark has multiple communication protocols that need to be secured and
* different underlying mechanisms are used depending on the protocol:
*
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
* Akka remoting allows you to specify a secure cookie that will be exchanged
* and ensured to be identical in the connection handshake between the client
* and the server. If they are not identical then the client will be refused
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* - Akka -> The only option here is to use the Akka Remote secure-cookie functionality.
* Akka remoting allows you to specify a secure cookie that will be exchanged
* and ensured to be identical in the connection handshake between the client
* and the server. If they are not identical then the client will be refused
* to connect to the server. There is no control of the underlying
* authentication mechanism so its not clear if the password is passed in
* plaintext or uses DIGEST-MD5 or some other mechanism.
* Akka also has an option to turn on SSL, this option is not currently supported
* but we could add a configuration option in the future.
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
*
* - HTTP for broadcast and file server (via HttpServer) -> Spark currently uses Jetty
* for the HttpServer. Jetty supports multiple authentication mechanisms -
* Basic, Digest, Form, Spengo, etc. It also supports multiple different login
* services - Hash, JAAS, Spnego, JDBC, etc. Spark currently uses the HashLoginService
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
* to authenticate using DIGEST-MD5 via a single user and the shared secret.
* Since we are using DIGEST-MD5, the shared secret is not passed on the wire
* in plaintext.
* We currently do not support SSL (https), but Jetty can be configured to use it
* so we could add a configuration option for this in the future.
*
*
* The Spark HttpServer installs the HashLoginServer and configures it to DIGEST-MD5.
* Any clients must specify the user and password. There is a default
* Any clients must specify the user and password. There is a default
* Authenticator installed in the SecurityManager to how it does the authentication
* and in this case gets the user name and password from the request.
*
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
* exchange messages. For this we use the Java SASL
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* - ConnectionManager -> The Spark ConnectionManager uses java nio to asynchronously
* exchange messages. For this we use the Java SASL
* (Simple Authentication and Security Layer) API and again use DIGEST-MD5
* as the authentication mechanism. This means the shared secret is not passed
* over the wire in plaintext.
* Note that SASL is pluggable as to what mechanism it uses. We currently use
* DIGEST-MD5 but this could be changed to use Kerberos or other in the future.
* Spark currently supports "auth" for the quality of protection, which means
* the connection is not supporting integrity or privacy protection (encryption)
* after authentication. SASL also supports "auth-int" and "auth-conf" which
* after authentication. SASL also supports "auth-int" and "auth-conf" which
* SPARK could be support in the future to allow the user to specify the quality
* of protection they want. If we support those, the messages will also have to
* of protection they want. If we support those, the messages will also have to
* be wrapped and unwrapped via the SaslServer/SaslClient.wrap/unwrap API's.
*
* Since the connectionManager does asynchronous messages passing, the SASL
*
* Since the connectionManager does asynchronous messages passing, the SASL
* authentication is a bit more complex. A ConnectionManager can be both a client
* and a Server, so for a particular connection is has to determine what to do.
* A ConnectionId was added to be able to track connections and is used to
* A ConnectionId was added to be able to track connections and is used to
* match up incoming messages with connections waiting for authentication.
* If its acting as a client and trying to send a message to another ConnectionManager,
* it blocks the thread calling sendMessage until the SASL negotiation has occurred.
* The ConnectionManager tracks all the sendingConnections using the ConnectionId
* and waits for the response from the server and does the handshake.
*
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* - HTTP for the Spark UI -> the UI was changed to use servlets so that javax servlet filters
* can be used. Yarn requires a specific AmIpFilter be installed for security to work
* properly. For non-Yarn deployments, users can write a filter to go through a
* companies normal login service. If an authentication filter is in place then the
* SparkUI can be configured to check the logged in user against the list of users who
* have view acls to see if that user is authorized.
* The filters can also be used for many different purposes. For instance filters
* The filters can also be used for many different purposes. For instance filters
* could be used for logging, encryption, or compression.
*
*
* The exact mechanisms used to generate/distributed the shared secret is deployment specific.
*
*
* For Yarn deployments, the secret is automatically generated using the Akka remote
* Crypt.generateSecureCookie() API. The secret is placed in the Hadoop UGI which gets passed
* around via the Hadoop RPC mechanism. Hadoop RPC can be configured to support different levels
Expand All @@ -121,7 +121,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
* to reduce the possibility of web based attacks through YARN. Hadoop can be configured to use
* filters to do authentication. That authentication then happens via the ResourceManager Proxy
* and Spark will use that to do authorization against the view acls.
*
*
* For other Spark deployments, the shared secret must be specified via the
* spark.authenticate.secret config.
* All the nodes (Master and Workers) and the applications need to have the same shared secret.
Expand Down Expand Up @@ -152,7 +152,7 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
" are ui acls enabled: " + uiAclsOn + " users with view permissions: " + viewAcls.toString())

// Set our own authenticator to properly negotiate user/password for HTTP connections.
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// This is needed by the HTTP client fetching from the HttpServer. Put here so its
// only set once.
if (authOn) {
Authenticator.setDefault(
Expand Down Expand Up @@ -214,12 +214,12 @@ private[spark] class SecurityManager(sparkConf: SparkConf) extends Logging {
def uiAclsEnabled(): Boolean = uiAclsOn

/**
* Checks the given user against the view acl list to see if they have
* Checks the given user against the view acl list to see if they have
* authorization to view the UI. If the UI acls must are disabled
* via spark.ui.acls.enable, all users have view access.
*
*
* @param user to see if is authorized
* @return true is the user has permission, otherwise false
* @return true is the user has permission, otherwise false
*/
def checkUIViewPermissions(user: String): Boolean = {
if (uiAclsEnabled() && (user != null) && (!viewAcls.contains(user))) false else true
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,16 @@ class SparkContext(config: SparkConf) extends Logging {
dagScheduler.cancelAllJobs()
}

/** Cancel a given job if it's scheduled or running */
private[spark] def cancelJob(jobId: Int) {
dagScheduler.cancelJob(jobId)
}

/** Cancel a given stage and all jobs associated with it */
private[spark] def cancelStage(stageId: Int) {
dagScheduler.cancelStage(stageId)
}

/**
* Clean a closure to make it ready to serialized and send to tasks
* (removes unreferenced variables in $outer's, updates REPL variables)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkException.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ package org.apache.spark
class SparkException(message: String, cause: Throwable)
extends Exception(message, cause) {

def this(message: String) = this(message, null)
def this(message: String) = this(message, null)
}
20 changes: 10 additions & 10 deletions core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

private val now = new Date()
private val conf = new SerializableWritable(jobConf)

private var jobID = 0
private var splitID = 0
private var attemptID = 0
Expand All @@ -58,8 +58,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
def preSetup() {
setIDs(0, 0, 0)
HadoopRDD.addLocalConfiguration("", 0, 0, 0, conf.value)
val jCtxt = getJobContext()

val jCtxt = getJobContext()
getOutputCommitter().setupJob(jCtxt)
}

Expand All @@ -74,7 +74,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
val numfmt = NumberFormat.getInstance()
numfmt.setMinimumIntegerDigits(5)
numfmt.setGroupingUsed(false)

val outputName = "part-" + numfmt.format(splitID)
val path = FileOutputFormat.getOutputPath(conf.value)
val fs: FileSystem = {
Expand All @@ -85,7 +85,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}
}

getOutputCommitter().setupTask(getTaskContext())
getOutputCommitter().setupTask(getTaskContext())
writer = getOutputFormat().getRecordWriter(fs, conf.value, outputName, Reporter.NULL)
}

Expand All @@ -103,18 +103,18 @@ class SparkHadoopWriter(@transient jobConf: JobConf)

def commit() {
val taCtxt = getTaskContext()
val cmtr = getOutputCommitter()
val cmtr = getOutputCommitter()
if (cmtr.needsTaskCommit(taCtxt)) {
try {
cmtr.commitTask(taCtxt)
logInfo (taID + ": Committed")
} catch {
case e: IOException => {
case e: IOException => {
logError("Error committing the output of task: " + taID.value, e)
cmtr.abortTask(taCtxt)
throw e
}
}
}
} else {
logWarning ("No need to commit output of task: " + taID.value)
}
Expand Down Expand Up @@ -144,7 +144,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
}

private def getJobContext(): JobContext = {
if (jobContext == null) {
if (jobContext == null) {
jobContext = newJobContext(conf.value, jID.value)
}
jobContext
Expand Down Expand Up @@ -175,7 +175,7 @@ object SparkHadoopWriter {
val jobtrackerID = formatter.format(time)
new JobID(jobtrackerID, id)
}

def createPathFromString(path: String, conf: JobConf): Path = {
if (path == null) {
throw new IllegalArgumentException("Output path is null")
Expand Down
Loading