Skip to content

Commit 240c91e

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-2970
2 parents 0e7b45d + 8c5a222 commit 240c91e

File tree

200 files changed

+6426
-3146
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

200 files changed

+6426
-3146
lines changed

.travis.yml

Lines changed: 0 additions & 32 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6666

6767
/**
6868
* Whether the cleaning thread will block on cleanup tasks.
69-
* This is set to true only for tests.
69+
*
70+
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
71+
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
72+
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
73+
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
74+
* longer in scope.
7075
*/
7176
private val blockOnCleanupTasks = sc.conf.getBoolean(
72-
"spark.cleaner.referenceTracking.blocking", false)
77+
"spark.cleaner.referenceTracking.blocking", true)
7378

7479
@volatile private var stopped = false
7580

@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
174179
private def blockManagerMaster = sc.env.blockManager.master
175180
private def broadcastManager = sc.env.broadcastManager
176181
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
177-
178-
// Used for testing. These methods explicitly blocks until cleanup is completed
179-
// to ensure that more reliable testing.
180182
}
181183

182184
private object ContextCleaner {

core/src/main/scala/org/apache/spark/SparkConf.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
4545
/** Create a SparkConf that loads defaults from system properties and the classpath */
4646
def this() = this(true)
4747

48-
private val settings = new HashMap[String, String]()
48+
private[spark] val settings = new HashMap[String, String]()
4949

5050
if (loadDefaults) {
5151
// Load any spark.* system properties
@@ -210,6 +210,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
210210
new SparkConf(false).setAll(settings)
211211
}
212212

213+
/**
214+
* By using this instead of System.getenv(), environment variables can be mocked
215+
* in unit tests.
216+
*/
217+
private[spark] def getenv(name: String): String = System.getenv(name)
218+
213219
/** Checks for illegal or deprecated config settings. Throws an exception for the former. Not
214220
* idempotent - may mutate this conf object to convert deprecated settings to supported ones. */
215221
private[spark] def validateSettings() {
@@ -227,7 +233,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
227233
// Validate spark.executor.extraJavaOptions
228234
settings.get(executorOptsKey).map { javaOpts =>
229235
if (javaOpts.contains("-Dspark")) {
230-
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts)'. " +
236+
val msg = s"$executorOptsKey is not allowed to set Spark options (was '$javaOpts'). " +
231237
"Set them directly on a SparkConf or in a properties file when using ./bin/spark-submit."
232238
throw new Exception(msg)
233239
}

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,22 @@ object SparkEnv extends Logging {
210210
"MapOutputTracker",
211211
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
212212

213+
// Let the user specify short names for shuffle managers
214+
val shortShuffleMgrNames = Map(
215+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
216+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
217+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
218+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
219+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
220+
221+
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
222+
213223
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
214224
"BlockManagerMaster",
215225
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
216226

217227
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
218-
serializer, conf, securityManager, mapOutputTracker)
228+
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
219229

220230
val connectionManager = blockManager.connectionManager
221231

@@ -250,16 +260,6 @@ object SparkEnv extends Logging {
250260
"."
251261
}
252262

253-
// Let the user specify short names for shuffle managers
254-
val shortShuffleMgrNames = Map(
255-
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256-
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257-
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258-
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259-
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
260-
261-
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
262-
263263
// Warn about deprecated spark.cache.class property
264264
if (conf.contains("spark.cache.class")) {
265265
logWarning("The spark.cache.class property is no longer being used! Specify storage " +

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private[spark] class PythonRDD(
6262
val env = SparkEnv.get
6363
val localdir = env.blockManager.diskBlockManager.localDirs.map(
6464
f => f.getPath()).mkString(",")
65-
envVars += ("SPARK_LOCAL_DIR" -> localdir) // it's also used in monitor thread
65+
envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread
6666
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
6767

6868
// Start a thread to feed the process input from our parent's iterator
@@ -315,6 +315,14 @@ private[spark] object PythonRDD extends Logging {
315315
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
316316
}
317317

318+
def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
319+
val file = new DataInputStream(new FileInputStream(filename))
320+
val length = file.readInt()
321+
val obj = new Array[Byte](length)
322+
file.readFully(obj)
323+
sc.broadcast(obj)
324+
}
325+
318326
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
319327
// The right way to implement this would be to use TypeTags to get the full
320328
// type of T. Since I don't want to introduce breaking changes throughout the

core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,19 @@ import org.apache.spark.annotation.DeveloperApi
3232
*/
3333
@DeveloperApi
3434
trait BroadcastFactory {
35+
3536
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit
37+
38+
/**
39+
* Creates a new broadcast variable.
40+
*
41+
* @param value value to broadcast
42+
* @param isLocal whether we are in local mode (single JVM process)
43+
* @param id unique id representing this broadcast variable
44+
*/
3645
def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T]
46+
3747
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit
48+
3849
def stop(): Unit
3950
}

0 commit comments

Comments
 (0)