Skip to content

Commit 085aef8

Browse files
committed
Merge branch 'master' into groupby
Conflicts: python/pyspark/rdd.py
2 parents 3ee58e5 + cd0720c commit 085aef8

File tree

149 files changed

+3453
-1533
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

149 files changed

+3453
-1533
lines changed

.travis.yml

Lines changed: 0 additions & 32 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,10 +66,15 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
6666

6767
/**
6868
* Whether the cleaning thread will block on cleanup tasks.
69-
* This is set to true only for tests.
69+
*
70+
* Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
71+
* workaround for the issue, which is ultimately caused by the way the BlockManager actors
72+
* issue inter-dependent blocking Akka messages to each other at high frequencies. This happens,
73+
* for instance, when the driver performs a GC and cleans up all broadcast blocks that are no
74+
* longer in scope.
7075
*/
7176
private val blockOnCleanupTasks = sc.conf.getBoolean(
72-
"spark.cleaner.referenceTracking.blocking", false)
77+
"spark.cleaner.referenceTracking.blocking", true)
7378

7479
@volatile private var stopped = false
7580

@@ -174,9 +179,6 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
174179
private def blockManagerMaster = sc.env.blockManager.master
175180
private def broadcastManager = sc.env.broadcastManager
176181
private def mapOutputTrackerMaster = sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
177-
178-
// Used for testing. These methods explicitly blocks until cleanup is completed
179-
// to ensure that more reliable testing.
180182
}
181183

182184
private object ContextCleaner {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -210,12 +210,22 @@ object SparkEnv extends Logging {
210210
"MapOutputTracker",
211211
new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
212212

213+
// Let the user specify short names for shuffle managers
214+
val shortShuffleMgrNames = Map(
215+
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
216+
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
217+
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
218+
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
219+
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
220+
221+
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
222+
213223
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
214224
"BlockManagerMaster",
215225
new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
216226

217227
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
218-
serializer, conf, securityManager, mapOutputTracker)
228+
serializer, conf, securityManager, mapOutputTracker, shuffleManager)
219229

220230
val connectionManager = blockManager.connectionManager
221231

@@ -250,16 +260,6 @@ object SparkEnv extends Logging {
250260
"."
251261
}
252262

253-
// Let the user specify short names for shuffle managers
254-
val shortShuffleMgrNames = Map(
255-
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
256-
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager")
257-
val shuffleMgrName = conf.get("spark.shuffle.manager", "hash")
258-
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
259-
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
260-
261-
val shuffleMemoryManager = new ShuffleMemoryManager(conf)
262-
263263
// Warn about deprecated spark.cache.class property
264264
if (conf.contains("spark.cache.class")) {
265265
logWarning("The spark.cache.class property is no longer being used! Specify storage " +

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,14 @@ private[spark] object PythonRDD extends Logging {
315315
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
316316
}
317317

318+
def readBroadcastFromFile(sc: JavaSparkContext, filename: String): Broadcast[Array[Byte]] = {
319+
val file = new DataInputStream(new FileInputStream(filename))
320+
val length = file.readInt()
321+
val obj = new Array[Byte](length)
322+
file.readFully(obj)
323+
sc.broadcast(obj)
324+
}
325+
318326
def writeIteratorToStream[T](iter: Iterator[T], dataOut: DataOutputStream) {
319327
// The right way to implement this would be to use TypeTags to get the full
320328
// type of T. Since I don't want to introduce breaking changes throughout the

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.broadcast
1919

20-
import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
21-
ObjectInputStream, ObjectOutputStream, OutputStream}
20+
import java.io._
2221

2322
import scala.reflect.ClassTag
2423
import scala.util.Random
@@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](
5352

5453
private val broadcastId = BroadcastBlockId(id)
5554

56-
TorrentBroadcast.synchronized {
57-
SparkEnv.get.blockManager.putSingle(
58-
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
59-
}
55+
SparkEnv.get.blockManager.putSingle(
56+
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
6057

6158
@transient private var arrayOfBlocks: Array[TorrentBlock] = null
6259
@transient private var totalBlocks = -1
@@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
9188
// Store meta-info
9289
val metaId = BroadcastBlockId(id, "meta")
9390
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
94-
TorrentBroadcast.synchronized {
95-
SparkEnv.get.blockManager.putSingle(
96-
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
97-
}
91+
SparkEnv.get.blockManager.putSingle(
92+
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
9893

9994
// Store individual pieces
10095
for (i <- 0 until totalBlocks) {
10196
val pieceId = BroadcastBlockId(id, "piece" + i)
102-
TorrentBroadcast.synchronized {
103-
SparkEnv.get.blockManager.putSingle(
104-
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
105-
}
97+
SparkEnv.get.blockManager.putSingle(
98+
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
10699
}
107100
}
108101

@@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
165158
val metaId = BroadcastBlockId(id, "meta")
166159
var attemptId = 10
167160
while (attemptId > 0 && totalBlocks == -1) {
168-
TorrentBroadcast.synchronized {
169-
SparkEnv.get.blockManager.getSingle(metaId) match {
170-
case Some(x) =>
171-
val tInfo = x.asInstanceOf[TorrentInfo]
172-
totalBlocks = tInfo.totalBlocks
173-
totalBytes = tInfo.totalBytes
174-
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
175-
hasBlocks = 0
176-
177-
case None =>
178-
Thread.sleep(500)
179-
}
161+
SparkEnv.get.blockManager.getSingle(metaId) match {
162+
case Some(x) =>
163+
val tInfo = x.asInstanceOf[TorrentInfo]
164+
totalBlocks = tInfo.totalBlocks
165+
totalBytes = tInfo.totalBytes
166+
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
167+
hasBlocks = 0
168+
169+
case None =>
170+
Thread.sleep(500)
180171
}
181172
attemptId -= 1
182173
}
174+
183175
if (totalBlocks == -1) {
184176
return false
185177
}
@@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
192184
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
193185
for (pid <- recvOrder) {
194186
val pieceId = BroadcastBlockId(id, "piece" + pid)
195-
TorrentBroadcast.synchronized {
196-
SparkEnv.get.blockManager.getSingle(pieceId) match {
197-
case Some(x) =>
198-
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
199-
hasBlocks += 1
200-
SparkEnv.get.blockManager.putSingle(
201-
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
187+
SparkEnv.get.blockManager.getSingle(pieceId) match {
188+
case Some(x) =>
189+
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
190+
hasBlocks += 1
191+
SparkEnv.get.blockManager.putSingle(
192+
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
202193

203-
case None =>
204-
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
205-
}
194+
case None =>
195+
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
206196
}
207197
}
208198

@@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
291281
* If removeFromDriver is true, also remove these persisted blocks on the driver.
292282
*/
293283
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
294-
synchronized {
295-
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
296-
}
284+
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
297285
}
298286
}
299287

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ private[spark] class Master(
697697
appIdToUI(app.id) = ui
698698
webUi.attachSparkUI(ui)
699699
// Application UI is successfully rebuilt, so link the Master UI to it
700-
app.desc.appUiUrl = ui.basePath
700+
app.desc.appUiUrl = ui.getBasePath
701701
true
702702
} catch {
703703
case e: Exception =>

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 14 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ private[spark] class Worker(
7272
val APP_DATA_RETENTION_SECS = conf.getLong("spark.worker.cleanup.appDataTtl", 7 * 24 * 3600)
7373

7474
val testing: Boolean = sys.props.contains("spark.testing")
75-
val masterLock: Object = new Object()
7675
var master: ActorSelection = null
7776
var masterAddress: Address = null
7877
var activeMasterUrl: String = ""
@@ -145,18 +144,16 @@ private[spark] class Worker(
145144
}
146145

147146
def changeMaster(url: String, uiUrl: String) {
148-
masterLock.synchronized {
149-
activeMasterUrl = url
150-
activeMasterWebUiUrl = uiUrl
151-
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
152-
masterAddress = activeMasterUrl match {
153-
case Master.sparkUrlRegex(_host, _port) =>
154-
Address("akka.tcp", Master.systemName, _host, _port.toInt)
155-
case x =>
156-
throw new SparkException("Invalid spark URL: " + x)
157-
}
158-
connected = true
147+
activeMasterUrl = url
148+
activeMasterWebUiUrl = uiUrl
149+
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
150+
masterAddress = activeMasterUrl match {
151+
case Master.sparkUrlRegex(_host, _port) =>
152+
Address("akka.tcp", Master.systemName, _host, _port.toInt)
153+
case x =>
154+
throw new SparkException("Invalid spark URL: " + x)
159155
}
156+
connected = true
160157
}
161158

162159
def tryRegisterAllMasters() {
@@ -199,9 +196,7 @@ private[spark] class Worker(
199196
}
200197

201198
case SendHeartbeat =>
202-
masterLock.synchronized {
203-
if (connected) { master ! Heartbeat(workerId) }
204-
}
199+
if (connected) { master ! Heartbeat(workerId) }
205200

206201
case WorkDirCleanup =>
207202
// Spin up a separate thread (in a future) to do the dir cleanup; don't tie up worker actor
@@ -244,27 +239,21 @@ private[spark] class Worker(
244239
manager.start()
245240
coresUsed += cores_
246241
memoryUsed += memory_
247-
masterLock.synchronized {
248-
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
249-
}
242+
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
250243
} catch {
251244
case e: Exception => {
252245
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
253246
if (executors.contains(appId + "/" + execId)) {
254247
executors(appId + "/" + execId).kill()
255248
executors -= appId + "/" + execId
256249
}
257-
masterLock.synchronized {
258-
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
259-
}
250+
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
260251
}
261252
}
262253
}
263254

264255
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
265-
masterLock.synchronized {
266-
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
267-
}
256+
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
268257
val fullId = appId + "/" + execId
269258
if (ExecutorState.isFinished(state)) {
270259
executors.get(fullId) match {
@@ -330,9 +319,7 @@ private[spark] class Worker(
330319
case _ =>
331320
logDebug(s"Driver $driverId changed state to $state")
332321
}
333-
masterLock.synchronized {
334-
master ! DriverStateChanged(driverId, state, exception)
335-
}
322+
master ! DriverStateChanged(driverId, state, exception)
336323
val driver = drivers.remove(driverId).get
337324
finishedDrivers(driverId) = driver
338325
memoryUsed -= driver.driverDesc.mem

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ private[spark] class Executor(
9999
private val urlClassLoader = createClassLoader()
100100
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
101101

102+
// Set the classloader for serializer
103+
env.serializer.setDefaultClassLoader(urlClassLoader)
104+
102105
// Akka's message frame size. If task result is bigger than this, we use the block manager
103106
// to send the result back.
104107
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

0 commit comments

Comments
 (0)