Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.broadcast

import java.io._
import java.lang.ref.SoftReference
import java.nio.ByteBuffer
import java.util.zip.Adler32

Expand Down Expand Up @@ -63,9 +64,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
* Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
* which builds this value by reading blocks from the driver and/or other executors.
*
* On the driver, if the value is required, it is read lazily from the block manager.
* On the driver, if the value is required, it is read lazily from the block manager. We hold
* a soft reference so that it can be garbage collected if required, as we can always reconstruct
* in the future.
*/
@transient private lazy val _value: T = readBroadcastBlock()
@transient private var _value: SoftReference[T] = _

/** The compression codec to use, or None if compression is disabled */
@transient private var compressionCodec: Option[CompressionCodec] = _
Expand Down Expand Up @@ -94,8 +97,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
/** The checksum for all the blocks. */
private var checksums: Array[Int] = _

override protected def getValue() = {
_value
override protected def getValue() = synchronized {
val memoized: T = if (_value == null) null.asInstanceOf[T] else _value.get
if (memoized != null) {
memoized
} else {
val newlyRead = readBroadcastBlock()
_value = new SoftReference[T](newlyRead)
newlyRead
}
}

private def calcChecksum(block: ByteBuffer): Int = {
Expand Down Expand Up @@ -209,8 +219,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
}

private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
broadcastCache.synchronized {

Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
setConf(SparkEnv.get.conf)
Expand Down