Skip to content

Commit 2b95b5e

Browse files
committed
Added more documentation on Broadcast implementations, specially which blocks are told about to the driver. Also, fixed Broadcast API to hide destroy functionality.
1 parent 41c9ece commit 2b95b5e

File tree

7 files changed

+122
-38
lines changed

7 files changed

+122
-38
lines changed

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
7676
/** Start the cleaner. */
7777
def start() {
7878
cleaningThread.setDaemon(true)
79-
cleaningThread.setName("ContextCleaner")
79+
cleaningThread.setName("Spark Context Cleaner")
8080
cleaningThread.start()
8181
}
8282

core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import org.apache.spark.SparkException
2828
* attempts to distribute broadcast variables using efficient broadcast algorithms to reduce
2929
* communication cost.
3030
*
31-
* Broadcast variables are created from a variable `v` by calling [[SparkContext#broadcast]].
31+
* Broadcast variables are created from a variable `v` by calling
32+
* [[org.apache.spark.SparkContext#broadcast]].
3233
* The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the
3334
* `value` method. The interpreter session below shows this:
3435
*
@@ -51,15 +52,17 @@ import org.apache.spark.SparkException
5152
*/
5253
abstract class Broadcast[T](val id: Long) extends Serializable {
5354

54-
protected var _isValid: Boolean = true
55-
5655
/**
57-
* Whether this Broadcast is actually usable. This should be false once persisted state is
58-
* removed from the driver.
56+
* Flag signifying whether the broadcast variable is valid
57+
* (that is, not already destroyed) or not.
5958
*/
60-
def isValid: Boolean = _isValid
59+
@volatile private var _isValid = true
6160

62-
def value: T
61+
/** Get the broadcasted value. */
62+
def value: T = {
63+
assertValid()
64+
getValue()
65+
}
6366

6467
/**
6568
* Asynchronously delete cached copies of this broadcast on the executors.
@@ -74,23 +77,50 @@ abstract class Broadcast[T](val id: Long) extends Serializable {
7477
* this is called, it will need to be re-sent to each executor.
7578
* @param blocking Whether to block until unpersisting has completed
7679
*/
77-
def unpersist(blocking: Boolean)
80+
def unpersist(blocking: Boolean) {
81+
assertValid()
82+
doUnpersist(blocking)
83+
}
7884

7985
/**
80-
* Remove all persisted state associated with this broadcast on both the executors and
81-
* the driver.
86+
* Destroy all data and metadata related to this broadcast variable. Use this with caution;
87+
* once a broadcast variable has been destroyed, it cannot be used again.
8288
*/
8389
private[spark] def destroy(blocking: Boolean) {
90+
assertValid()
8491
_isValid = false
85-
onDestroy(blocking)
92+
doDestroy(blocking)
8693
}
8794

88-
protected def onDestroy(blocking: Boolean)
95+
/**
96+
* Whether this Broadcast is actually usable. This should be false once persisted state is
97+
* removed from the driver.
98+
*/
99+
private[spark] def isValid: Boolean = {
100+
_isValid
101+
}
102+
103+
/**
104+
* Actually get the broadcasted value. Concrete implementations of Broadcast class must
105+
* define their own way to get the value.
106+
*/
107+
private[spark] def getValue(): T
89108

90109
/**
91-
* If this broadcast is no longer valid, throw an exception.
110+
* Actually unpersist the broadcasted value on the executors. Concrete implementations of
111+
* Broadcast class must define their own logic to unpersist their own data.
92112
*/
93-
protected def assertValid() {
113+
private[spark] def doUnpersist(blocking: Boolean)
114+
115+
/**
116+
* Actually destroy all data and metadata related to this broadcast variable.
117+
* Implementation of Broadcast class must define their own logic to destroy their own
118+
* state.
119+
*/
120+
private[spark] def doDestroy(blocking: Boolean)
121+
122+
/** Check if this broadcast is valid. If not valid, exception is thrown. */
123+
private[spark] def assertValid() {
94124
if (!_isValid) {
95125
throw new SparkException("Attempted to use %s after it has been destroyed!".format(toString))
96126
}

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,16 +28,26 @@ import org.apache.spark.io.CompressionCodec
2828
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
2929
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils}
3030

31+
/**
32+
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses HTTP server
33+
* as a broadcast mechanism. The first time a HTTP broadcast variable (sent as part of a
34+
* task) is deserialized in the executor, the broadcasted data is fetched from the driver
35+
* (through a HTTP server running at the driver) and stored in the BlockManager of the
36+
* executor to speed up future accesses.
37+
*/
3138
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
3239
extends Broadcast[T](id) with Logging with Serializable {
3340

34-
def value: T = {
35-
assertValid()
36-
value_
37-
}
41+
def getValue = value_
3842

3943
val blockId = BroadcastBlockId(id)
4044

45+
/*
46+
* Broadcasted data is also stored in the BlockManager of the driver.
47+
* The BlockManagerMaster
48+
* does not need to be told about this block as not only
49+
* need to know about this data block.
50+
*/
4151
HttpBroadcast.synchronized {
4252
SparkEnv.get.blockManager.putSingle(
4353
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
@@ -50,21 +60,24 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
5060
/**
5161
* Remove all persisted state associated with this HTTP broadcast on the executors.
5262
*/
53-
def unpersist(blocking: Boolean) {
63+
def doUnpersist(blocking: Boolean) {
5464
HttpBroadcast.unpersist(id, removeFromDriver = false, blocking)
5565
}
5666

57-
protected def onDestroy(blocking: Boolean) {
67+
/**
68+
* Remove all persisted state associated with this HTTP broadcast on the executors and driver.
69+
*/
70+
def doDestroy(blocking: Boolean) {
5871
HttpBroadcast.unpersist(id, removeFromDriver = true, blocking)
5972
}
6073

61-
// Used by the JVM when serializing this object
74+
/** Used by the JVM when serializing this object. */
6275
private def writeObject(out: ObjectOutputStream) {
6376
assertValid()
6477
out.defaultWriteObject()
6578
}
6679

67-
// Used by the JVM when deserializing this object
80+
/** Used by the JVM when deserializing this object. */
6881
private def readObject(in: ObjectInputStream) {
6982
in.defaultReadObject()
7083
HttpBroadcast.synchronized {
@@ -74,6 +87,13 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
7487
logInfo("Started reading broadcast variable " + id)
7588
val start = System.nanoTime
7689
value_ = HttpBroadcast.read[T](id)
90+
/*
91+
* Storing the broadcast data in BlockManager so that all
92+
* so that all subsequent tasks using the broadcast variable
93+
* does not need to fetch it again. The BlockManagerMaster
94+
* does not need to be told about this block as no one
95+
* needs to know about this data block.
96+
*/
7797
SparkEnv.get.blockManager.putSingle(
7898
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
7999
val time = (System.nanoTime - start) / 1e9

core/src/main/scala/org/apache/spark/broadcast/HttpBroadcastFactory.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.broadcast
2020
import org.apache.spark.{SecurityManager, SparkConf}
2121

2222
/**
23-
* A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium.
23+
* A [[org.apache.spark.broadcast.BroadcastFactory]] implementation that uses a
24+
* HTTP server as the broadcast mechanism. Refer to
25+
* [[org.apache.spark.broadcast.HttpBroadcast]] for more details about this mechanism.
2426
*/
2527
class HttpBroadcastFactory extends BroadcastFactory {
2628
def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,28 @@ import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException}
2626
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
2727
import org.apache.spark.util.Utils
2828

29+
/**
30+
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
31+
* protocol to do a distributed transfer of the broadcasted data to the executors.
32+
* The mechanism is as follows. The driver divides the serializes the broadcasted data,
33+
* divides it into smaller chunks, and stores them in the BlockManager of the driver.
34+
* These chunks are reported to the BlockManagerMaster so that all the executors can
35+
* learn the location of those chunks. The first time the broadcast variable (sent as
36+
* part of task) is deserialized at a executor, all the chunks are fetched using
37+
* the BlockManager. When all the chunks are fetched (initially from the driver's
38+
* BlockManager), they are combined and deserialized to recreate the broadcasted data.
39+
* However, the chunks are also stored in the BlockManager and reported to the
40+
* BlockManagerMaster. As more executors fetch the chunks, BlockManagerMaster learns
41+
* multiple locations for each chunk. Hence, subsequent fetches of each chunk will be
42+
* made to other executors who already have those chunks, resulting in a distributed
43+
* fetching. This prevents the driver from being the bottleneck in sending out multiple
44+
* copies of the broadcast data (one per executor) as done by the
45+
* [[org.apache.spark.broadcast.HttpBroadcast]].
46+
*/
2947
private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
3048
extends Broadcast[T](id) with Logging with Serializable {
3149

32-
def value = {
33-
assertValid()
34-
value_
35-
}
50+
def getValue = value_
3651

3752
val broadcastId = BroadcastBlockId(id)
3853

@@ -53,15 +68,19 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
5368
/**
5469
* Remove all persisted state associated with this Torrent broadcast on the executors.
5570
*/
56-
def unpersist(blocking: Boolean) {
71+
def doUnpersist(blocking: Boolean) {
5772
TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
5873
}
5974

60-
protected def onDestroy(blocking: Boolean) {
75+
/**
76+
* Remove all persisted state associated with this Torrent broadcast on the executors
77+
* and driver.
78+
*/
79+
def doDestroy(blocking: Boolean) {
6180
TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
6281
}
6382

64-
private def sendBroadcast() {
83+
def sendBroadcast() {
6584
val tInfo = TorrentBroadcast.blockifyObject(value_)
6685
totalBlocks = tInfo.totalBlocks
6786
totalBytes = tInfo.totalBytes
@@ -85,13 +104,13 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
85104
}
86105
}
87106

88-
// Used by the JVM when serializing this object
107+
/** Used by the JVM when serializing this object. */
89108
private def writeObject(out: ObjectOutputStream) {
90109
assertValid()
91110
out.defaultWriteObject()
92111
}
93112

94-
// Used by the JVM when deserializing this object
113+
/** Used by the JVM when deserializing this object. */
95114
private def readObject(in: ObjectInputStream) {
96115
in.defaultReadObject()
97116
TorrentBroadcast.synchronized {
@@ -111,7 +130,11 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
111130

112131
/* Store the merged copy in cache so that the next worker doesn't need to rebuild it.
113132
* This creates a trade-off between memory usage and latency. Storing copy doubles
114-
* the memory footprint; not storing doubles deserialization cost. */
133+
* the memory footprint; not storing doubles deserialization cost. Also,
134+
* this does not need to be reported to BlockManagerMaster since other executors
135+
* does not need to access this block (they only need to fetch the chunks,
136+
* which are reported).
137+
*/
115138
SparkEnv.get.blockManager.putSingle(
116139
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
117140

@@ -135,7 +158,8 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
135158
}
136159

137160
def receiveBroadcast(): Boolean = {
138-
// Receive meta-info
161+
// Receive meta-info about the size of broadcast data,
162+
// the number of chunks it is divided into, etc.
139163
val metaId = BroadcastBlockId(id, "meta")
140164
var attemptId = 10
141165
while (attemptId > 0 && totalBlocks == -1) {
@@ -158,7 +182,11 @@ private[spark] class TorrentBroadcast[T](@transient var value_ : T, isLocal: Boo
158182
return false
159183
}
160184

161-
// Receive actual blocks
185+
/*
186+
* Fetch actual chunks of data. Note that all these chunks are stored in
187+
* the BlockManager and reported to the master, so that other executors
188+
* can find out and pull the chunks from this executor.
189+
*/
162190
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
163191
for (pid <- recvOrder) {
164192
val pieceId = BroadcastBlockId(id, "piece" + pid)

core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.broadcast
2020
import org.apache.spark.{SecurityManager, SparkConf}
2121

2222
/**
23-
* A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast.
23+
* A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
24+
* protocol to do a distributed transfer of the broadcasted data to the executors. Refer to
25+
* [[org.apache.spark.broadcast.TorrentBroadcast]] for more details.
2426
*/
2527
class TorrentBroadcastFactory extends BroadcastFactory {
2628

core/src/test/scala/org/apache/spark/BroadcastSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark
2020
import org.scalatest.FunSuite
2121

2222
import org.apache.spark.storage._
23-
import org.apache.spark.broadcast.HttpBroadcast
23+
import org.apache.spark.broadcast.{Broadcast, HttpBroadcast}
2424
import org.apache.spark.storage.BroadcastBlockId
2525

2626
class BroadcastSuite extends FunSuite with LocalSparkContext {
@@ -298,6 +298,8 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
298298
// Using this variable on the executors crashes them, which hangs the test.
299299
// Instead, crash the driver by directly accessing the broadcast value.
300300
intercept[SparkException] { broadcast.value }
301+
intercept[SparkException] { broadcast.unpersist() }
302+
intercept[SparkException] { broadcast.destroy(blocking = true) }
301303
} else {
302304
val results = sc.parallelize(1 to partitions, partitions).map(x => (x, broadcast.value.sum))
303305
assert(results.collect().toSet === (1 to partitions).map(x => (x, list.sum)).toSet)

0 commit comments

Comments
 (0)