Skip to content

Commit 16863ea

Browse files
committed
Merge master
2 parents 3fc27e7 + 9765241 commit 16863ea

File tree

195 files changed

+5403
-2438
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

195 files changed

+5403
-2438
lines changed

.rat-excludes

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,4 +86,8 @@ local-1430917381535_2
8686
DESCRIPTION
8787
NAMESPACE
8888
test_support/*
89+
.*Rd
90+
help/*
91+
html/*
92+
INDEX
8993
.lintr

LICENSE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -948,6 +948,6 @@ The following components are provided under the MIT License. See project link fo
948948
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.5 - http://www.slf4j.org)
949949
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
950950
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
951-
(The MIT License) Mockito (org.mockito:mockito-core:1.8.5 - http://www.mockito.org)
951+
(The MIT License) Mockito (org.mockito:mockito-core:1.9.5 - http://www.mockito.org)
952952
(MIT License) jquery (https://jquery.org/license/)
953953
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)

R/pkg/R/DataFrame.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,8 @@ setMethod("isLocal",
169169
#'}
170170
setMethod("showDF",
171171
signature(x = "DataFrame"),
172-
function(x, numRows = 20) {
173-
s <- callJMethod(x@sdf, "showString", numToInt(numRows))
172+
function(x, numRows = 20, truncate = TRUE) {
173+
s <- callJMethod(x@sdf, "showString", numToInt(numRows), truncate)
174174
cat(s)
175175
})
176176

core/pom.xml

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -69,16 +69,6 @@
6969
<dependency>
7070
<groupId>org.apache.hadoop</groupId>
7171
<artifactId>hadoop-client</artifactId>
72-
<exclusions>
73-
<exclusion>
74-
<groupId>javax.servlet</groupId>
75-
<artifactId>servlet-api</artifactId>
76-
</exclusion>
77-
<exclusion>
78-
<groupId>org.codehaus.jackson</groupId>
79-
<artifactId>jackson-mapper-asl</artifactId>
80-
</exclusion>
81-
</exclusions>
8272
</dependency>
8373
<dependency>
8474
<groupId>org.apache.spark</groupId>

core/src/main/scala/org/apache/spark/SSLOptions.scala

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.spark
1919

20-
import java.io.File
20+
import java.io.{File, FileInputStream}
21+
import java.security.{KeyStore, NoSuchAlgorithmException}
22+
import javax.net.ssl.{KeyManager, KeyManagerFactory, SSLContext, TrustManager, TrustManagerFactory}
2123

2224
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
2325
import org.eclipse.jetty.util.ssl.SslContextFactory
@@ -38,7 +40,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory
3840
* @param trustStore a path to the trust-store file
3941
* @param trustStorePassword a password to access the trust-store file
4042
* @param protocol SSL protocol (remember that SSLv3 was compromised) supported by Java
41-
* @param enabledAlgorithms a set of encryption algorithms to use
43+
* @param enabledAlgorithms a set of encryption algorithms that may be used
4244
*/
4345
private[spark] case class SSLOptions(
4446
enabled: Boolean = false,
@@ -48,7 +50,8 @@ private[spark] case class SSLOptions(
4850
trustStore: Option[File] = None,
4951
trustStorePassword: Option[String] = None,
5052
protocol: Option[String] = None,
51-
enabledAlgorithms: Set[String] = Set.empty) {
53+
enabledAlgorithms: Set[String] = Set.empty)
54+
extends Logging {
5255

5356
/**
5457
* Creates a Jetty SSL context factory according to the SSL settings represented by this object.
@@ -63,7 +66,7 @@ private[spark] case class SSLOptions(
6366
trustStorePassword.foreach(sslContextFactory.setTrustStorePassword)
6467
keyPassword.foreach(sslContextFactory.setKeyManagerPassword)
6568
protocol.foreach(sslContextFactory.setProtocol)
66-
sslContextFactory.setIncludeCipherSuites(enabledAlgorithms.toSeq: _*)
69+
sslContextFactory.setIncludeCipherSuites(supportedAlgorithms.toSeq: _*)
6770

6871
Some(sslContextFactory)
6972
} else {
@@ -94,14 +97,44 @@ private[spark] case class SSLOptions(
9497
.withValue("akka.remote.netty.tcp.security.protocol",
9598
ConfigValueFactory.fromAnyRef(protocol.getOrElse("")))
9699
.withValue("akka.remote.netty.tcp.security.enabled-algorithms",
97-
ConfigValueFactory.fromIterable(enabledAlgorithms.toSeq))
100+
ConfigValueFactory.fromIterable(supportedAlgorithms.toSeq))
98101
.withValue("akka.remote.netty.tcp.enable-ssl",
99102
ConfigValueFactory.fromAnyRef(true)))
100103
} else {
101104
None
102105
}
103106
}
104107

108+
/*
109+
* The supportedAlgorithms set is a subset of the enabledAlgorithms that
110+
* are supported by the current Java security provider for this protocol.
111+
*/
112+
private val supportedAlgorithms: Set[String] = {
113+
var context: SSLContext = null
114+
try {
115+
context = SSLContext.getInstance(protocol.orNull)
116+
/* The set of supported algorithms does not depend upon the keys, trust, or
117+
rng, although they will influence which algorithms are eventually used. */
118+
context.init(null, null, null)
119+
} catch {
120+
case npe: NullPointerException =>
121+
logDebug("No SSL protocol specified")
122+
context = SSLContext.getDefault
123+
case nsa: NoSuchAlgorithmException =>
124+
logDebug(s"No support for requested SSL protocol ${protocol.get}")
125+
context = SSLContext.getDefault
126+
}
127+
128+
val providerAlgorithms = context.getServerSocketFactory.getSupportedCipherSuites.toSet
129+
130+
// Log which algorithms we are discarding
131+
(enabledAlgorithms &~ providerAlgorithms).foreach { cipher =>
132+
logDebug(s"Discarding unsupported cipher $cipher")
133+
}
134+
135+
enabledAlgorithms & providerAlgorithms
136+
}
137+
105138
/** Returns a string representation of this SSLOptions with all the passwords masked. */
106139
override def toString: String = s"SSLOptions{enabled=$enabled, " +
107140
s"keyStore=$keyStore, keyStorePassword=${keyStorePassword.map(_ => "xxx")}, " +

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
315315
_dagScheduler = ds
316316
}
317317

318+
/**
319+
* A unique identifier for the Spark application.
320+
* Its format depends on the scheduler implementation.
321+
* (i.e.
322+
* in case of local spark app something like 'local-1433865536131'
323+
* in case of YARN something like 'application_1433865536131_34483'
324+
* )
325+
*/
318326
def applicationId: String = _applicationId
319327
def applicationAttemptId: Option[String] = _applicationAttemptId
320328

@@ -823,7 +831,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
823831
* }}}
824832
*
825833
* @note Small files are preferred, large file is also allowable, but may cause bad performance.
826-
*
834+
* @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files
835+
* in a directory rather than `.../path/` or `.../path`
827836
* @param minPartitions A suggestion value of the minimal splitting number for input data.
828837
*/
829838
def wholeTextFiles(
@@ -870,9 +879,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
870879
* (a-hdfs-path/part-nnnnn, its content)
871880
* }}}
872881
*
873-
* @param minPartitions A suggestion value of the minimal splitting number for input data.
874-
*
875882
* @note Small files are preferred; very large files may cause bad performance.
883+
* @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files
884+
* in a directory rather than `.../path/` or `.../path`
885+
* @param minPartitions A suggestion value of the minimal splitting number for input data.
876886
*/
877887
@Experimental
878888
def binaryFiles(

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 34 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.net.Socket
2222

2323
import akka.actor.ActorSystem
2424

25-
import scala.collection.JavaConversions._
2625
import scala.collection.mutable
2726
import scala.util.Properties
2827

@@ -90,39 +89,42 @@ class SparkEnv (
9089
private var driverTmpDirToDelete: Option[String] = None
9190

9291
private[spark] def stop() {
93-
isStopped = true
94-
pythonWorkers.foreach { case(key, worker) => worker.stop() }
95-
Option(httpFileServer).foreach(_.stop())
96-
mapOutputTracker.stop()
97-
shuffleManager.stop()
98-
broadcastManager.stop()
99-
blockManager.stop()
100-
blockManager.master.stop()
101-
metricsSystem.stop()
102-
outputCommitCoordinator.stop()
103-
rpcEnv.shutdown()
104-
105-
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
106-
// down, but let's call it anyway in case it gets fixed in a later release
107-
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
108-
// actorSystem.awaitTermination()
109-
110-
// Note that blockTransferService is stopped by BlockManager since it is started by it.
111-
112-
// If we only stop sc, but the driver process still run as a services then we need to delete
113-
// the tmp dir, if not, it will create too many tmp dirs.
114-
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
115-
// current working dir in executor which we do not need to delete.
116-
driverTmpDirToDelete match {
117-
case Some(path) => {
118-
try {
119-
Utils.deleteRecursively(new File(path))
120-
} catch {
121-
case e: Exception =>
122-
logWarning(s"Exception while deleting Spark temp dir: $path", e)
92+
93+
if (!isStopped) {
94+
isStopped = true
95+
pythonWorkers.values.foreach(_.stop())
96+
Option(httpFileServer).foreach(_.stop())
97+
mapOutputTracker.stop()
98+
shuffleManager.stop()
99+
broadcastManager.stop()
100+
blockManager.stop()
101+
blockManager.master.stop()
102+
metricsSystem.stop()
103+
outputCommitCoordinator.stop()
104+
rpcEnv.shutdown()
105+
106+
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
107+
// down, but let's call it anyway in case it gets fixed in a later release
108+
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
109+
// actorSystem.awaitTermination()
110+
111+
// Note that blockTransferService is stopped by BlockManager since it is started by it.
112+
113+
// If we only stop sc, but the driver process still run as a services then we need to delete
114+
// the tmp dir, if not, it will create too many tmp dirs.
115+
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
116+
// current working dir in executor which we do not need to delete.
117+
driverTmpDirToDelete match {
118+
case Some(path) => {
119+
try {
120+
Utils.deleteRecursively(new File(path))
121+
} catch {
122+
case e: Exception =>
123+
logWarning(s"Exception while deleting Spark temp dir: $path", e)
124+
}
123125
}
126+
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
124127
}
125-
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
126128
}
127129
}
128130

core/src/main/scala/org/apache/spark/api/r/RRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,7 +391,7 @@ private[r] object RRDD {
391391
}
392392

393393
private def createRProcess(rLibDir: String, port: Int, script: String): BufferedStreamThread = {
394-
val rCommand = "Rscript"
394+
val rCommand = SparkEnv.get.conf.get("spark.sparkr.r.command", "Rscript")
395395
val rOptions = "--vanilla"
396396
val rExecScript = rLibDir + "/SparkR/worker/" + script
397397
val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript))

0 commit comments

Comments
 (0)