Skip to content

Commit a0fd790

Browse files
committed
Merge remote-tracking branch 'upstream/master' into streaming-kmeans
2 parents 9fd9c15 + 3a845d3 commit a0fd790

File tree

8,273 files changed

+40302
-47672
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

8,273 files changed

+40302
-47672
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ sbt-launch-lib.bash
4848
plugins.sbt
4949
work
5050
.*\.q
51+
.*\.qv
5152
golden
5253
test.out/*
5354
.*iml

assembly/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,12 @@
197197
<artifactId>spark-hive_${scala.binary.version}</artifactId>
198198
<version>${project.version}</version>
199199
</dependency>
200+
</dependencies>
201+
</profile>
202+
<profile>
203+
<!-- TODO: Move this to "hive" profile once 0.13 JDBC is supported -->
204+
<id>hive-0.12.0</id>
205+
<dependencies>
200206
<dependency>
201207
<groupId>org.apache.spark</groupId>
202208
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,10 @@
132132
<groupId>com.twitter</groupId>
133133
<artifactId>chill-java</artifactId>
134134
</dependency>
135+
<dependency>
136+
<groupId>org.roaringbitmap</groupId>
137+
<artifactId>RoaringBitmap</artifactId>
138+
</dependency>
135139
<dependency>
136140
<groupId>commons-net</groupId>
137141
<artifactId>commons-net</artifactId>

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import scala.collection.mutable.Map
2424
import scala.reflect.ClassTag
2525

2626
import org.apache.spark.serializer.JavaSerializer
27+
import org.apache.spark.util.Utils
2728

2829
/**
2930
* A data type that can be accumulated, ie has an commutative and associative "add" operation,
@@ -126,7 +127,7 @@ class Accumulable[R, T] (
126127
}
127128

128129
// Called by Java when deserializing an object
129-
private def readObject(in: ObjectInputStream) {
130+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
130131
in.defaultReadObject()
131132
value_ = zero
132133
deserialized = true

core/src/main/scala/org/apache/spark/CacheManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
6161
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
6262

6363
// If the task is running locally, do not persist the result
64-
if (context.runningLocally) {
64+
if (context.isRunningLocally) {
6565
return computedValues
6666
}
6767

core/src/main/scala/org/apache/spark/Partitioner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
204204
}
205205

206206
@throws(classOf[IOException])
207-
private def writeObject(out: ObjectOutputStream) {
207+
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
208208
val sfactory = SparkEnv.get.serializer
209209
sfactory match {
210210
case js: JavaSerializer => out.defaultWriteObject()
@@ -222,7 +222,7 @@ class RangePartitioner[K : Ordering : ClassTag, V](
222222
}
223223

224224
@throws(classOf[IOException])
225-
private def readObject(in: ObjectInputStream) {
225+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
226226
val sfactory = SparkEnv.get.serializer
227227
sfactory match {
228228
case js: JavaSerializer => in.defaultReadObject()

core/src/main/scala/org/apache/spark/SerializableWritable.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,18 +24,19 @@ import org.apache.hadoop.io.ObjectWritable
2424
import org.apache.hadoop.io.Writable
2525

2626
import org.apache.spark.annotation.DeveloperApi
27+
import org.apache.spark.util.Utils
2728

2829
@DeveloperApi
2930
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
3031
def value = t
3132
override def toString = t.toString
3233

33-
private def writeObject(out: ObjectOutputStream) {
34+
private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {
3435
out.defaultWriteObject()
3536
new ObjectWritable(t).write(out)
3637
}
3738

38-
private def readObject(in: ObjectInputStream) {
39+
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
3940
in.defaultReadObject()
4041
val ow = new ObjectWritable()
4142
ow.setConf(new Configuration())

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
package org.apache.spark
1919

2020
import scala.collection.JavaConverters._
21-
import scala.collection.mutable.HashMap
21+
import scala.collection.mutable.{HashMap, LinkedHashSet}
22+
import org.apache.spark.serializer.KryoSerializer
2223

2324
/**
2425
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
@@ -140,6 +141,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
140141
this
141142
}
142143

144+
/**
145+
* Use Kryo serialization and register the given set of classes with Kryo.
146+
* If called multiple times, this will append the classes from all calls together.
147+
*/
148+
def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {
149+
val allClassNames = new LinkedHashSet[String]()
150+
allClassNames ++= get("spark.kryo.classesToRegister", "").split(',').filter(!_.isEmpty)
151+
allClassNames ++= classes.map(_.getName)
152+
153+
set("spark.kryo.classesToRegister", allClassNames.mkString(","))
154+
set("spark.serializer", classOf[KryoSerializer].getName)
155+
this
156+
}
157+
143158
/** Remove a parameter from the configuration */
144159
def remove(key: String): SparkConf = {
145160
settings.remove(key)

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -209,16 +209,10 @@ class SparkContext(config: SparkConf) extends Logging {
209209
// An asynchronous listener bus for Spark events
210210
private[spark] val listenerBus = new LiveListenerBus
211211

212-
// Create the Spark execution environment (cache, map output tracker, etc)
213212
conf.set("spark.executor.id", "driver")
214-
private[spark] val env = SparkEnv.create(
215-
conf,
216-
"<driver>",
217-
conf.get("spark.driver.host"),
218-
conf.get("spark.driver.port").toInt,
219-
isDriver = true,
220-
isLocal = isLocal,
221-
listenerBus = listenerBus)
213+
214+
// Create the Spark execution environment (cache, map output tracker, etc)
215+
private[spark] val env = SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
222216
SparkEnv.set(env)
223217

224218
// Used to store a URL for each static file/jar together with the file's local timestamp
@@ -837,11 +831,12 @@ class SparkContext(config: SparkConf) extends Logging {
837831
case "local" => "file:" + uri.getPath
838832
case _ => path
839833
}
840-
addedFiles(key) = System.currentTimeMillis
834+
val timestamp = System.currentTimeMillis
835+
addedFiles(key) = timestamp
841836

842837
// Fetch the file locally in case a job is executed using DAGScheduler.runLocally().
843838
Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,
844-
hadoopConfiguration)
839+
hadoopConfiguration, timestamp, useCache = false)
845840

846841
logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
847842
postEnvironmentUpdate()

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,13 +68,15 @@ class SparkEnv (
6868
val shuffleMemoryManager: ShuffleMemoryManager,
6969
val conf: SparkConf) extends Logging {
7070

71+
private[spark] var isStopped = false
7172
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
7273

7374
// A general, soft-reference map for metadata needed during HadoopRDD split computation
7475
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
7576
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
7677

7778
private[spark] def stop() {
79+
isStopped = true
7880
pythonWorkers.foreach { case(key, worker) => worker.stop() }
7981
Option(httpFileServer).foreach(_.stop())
8082
mapOutputTracker.stop()
@@ -142,24 +144,63 @@ object SparkEnv extends Logging {
142144
env
143145
}
144146

145-
private[spark] def create(
147+
/**
148+
* Create a SparkEnv for the driver.
149+
*/
150+
private[spark] def createDriverEnv(
151+
conf: SparkConf,
152+
isLocal: Boolean,
153+
listenerBus: LiveListenerBus): SparkEnv = {
154+
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
155+
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
156+
val hostname = conf.get("spark.driver.host")
157+
val port = conf.get("spark.driver.port").toInt
158+
create(conf, "<driver>", hostname, port, true, isLocal, listenerBus)
159+
}
160+
161+
/**
162+
* Create a SparkEnv for an executor.
163+
* In coarse-grained mode, the executor provides an actor system that is already instantiated.
164+
*/
165+
private[spark] def createExecutorEnv(
166+
conf: SparkConf,
167+
executorId: String,
168+
hostname: String,
169+
port: Int,
170+
isLocal: Boolean,
171+
actorSystem: ActorSystem = null): SparkEnv = {
172+
create(conf, executorId, hostname, port, false, isLocal, defaultActorSystem = actorSystem)
173+
}
174+
175+
/**
176+
* Helper method to create a SparkEnv for a driver or an executor.
177+
*/
178+
private def create(
146179
conf: SparkConf,
147180
executorId: String,
148181
hostname: String,
149182
port: Int,
150183
isDriver: Boolean,
151184
isLocal: Boolean,
152-
listenerBus: LiveListenerBus = null): SparkEnv = {
185+
listenerBus: LiveListenerBus = null,
186+
defaultActorSystem: ActorSystem = null): SparkEnv = {
153187

154188
// Listener bus is only used on the driver
155189
if (isDriver) {
156190
assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!")
157191
}
158192

159193
val securityManager = new SecurityManager(conf)
160-
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
161-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
162-
actorSystemName, hostname, port, conf, securityManager)
194+
195+
// If an existing actor system is already provided, use it.
196+
// This is the case when an executor is launched in coarse-grained mode.
197+
val (actorSystem, boundPort) =
198+
Option(defaultActorSystem) match {
199+
case Some(as) => (as, port)
200+
case None =>
201+
val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
202+
AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
203+
}
163204

164205
// Figure out which port Akka actually bound to in case the original port is 0 or occupied.
165206
// This is so that we tell the executors the correct port to connect to.

0 commit comments

Comments
 (0)