From 0b0a252c57f75e2671aecef6c84bc29de3f6b18b Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 11:24:25 +0800 Subject: [PATCH 01/13] executor-side broadcast --- .../spark/broadcast/BroadcastFactory.scala | 11 ++ .../spark/broadcast/BroadcastManager.scala | 30 +++++ .../spark/broadcast/TorrentBroadcast.scala | 29 +++-- .../broadcast/TorrentBroadcastFactory.scala | 10 +- .../main/scala/org/apache/spark/rdd/RDD.scala | 52 ++++++++- .../apache/spark/storage/BlockManager.scala | 103 ++++++++++++++++++ .../scala/org/apache/spark/rdd/RDDSuite.scala | 48 +++++++- .../plans/physical/broadcastMode.scala | 3 +- .../exchange/BroadcastExchangeExec.scala | 64 +++++++---- .../exchange/EnsureRequirements.scala | 2 +- .../apache/spark/sql/internal/SQLConf.scala | 7 ++ 11 files changed, 320 insertions(+), 39 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index ece4ae6ab031..c0911f6ea70e 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -40,6 +40,17 @@ private[spark] trait BroadcastFactory { */ def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T] + /** + * create a new broadcast variable with a specified id. The different of the origin interface + * is that there is a new param `isExecutorSide` to tell the BroadCast it is a executor-side + * broadcast and should consider recovery when get block data failed. + */ + def newBroadcast[T: ClassTag]( + value: T, + isLocal: Boolean, + id: Long, + isExecutorSide: Boolean): Broadcast[T] + def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit def stop(): Unit diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index e88988fe03b2..178274688e24 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -52,11 +52,41 @@ private[spark] class BroadcastManager( private val nextBroadcastId = new AtomicLong(0) + // Called from driver to create new broadcast id + def newBroadcastId: Long = nextBroadcastId.getAndIncrement() + def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = { broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } + // Called from executor to create broadcast with specified id + def newBroadcast[T: ClassTag]( + value_ : T, + isLocal: Boolean, + id: Long + ): Broadcast[T] = { + broadcastFactory.newBroadcast[T](value_, isLocal, id) + } + + // Called from driver to create broadcast with specified id + def newBroadcast[T: ClassTag]( + value_ : T, + isLocal: Boolean, + id: Long, + isExecutorSide: Boolean): Broadcast[T] = { + broadcastFactory.newBroadcast[T](value_, isLocal, id, isExecutorSide) + } + def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { broadcastFactory.unbroadcast(id, removeFromDriver, blocking) } } + +/** + * Marker trait to identify the shape in which tuples are broadcasted. This is used for executor-side + * broadcast, typical examples of this are identity (tuples remain unchanged) or hashed (tuples are + * converted into some hash index). + */ +trait TransFunc[T, U] extends Serializable { + def transform(rows: Array[T]): U +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 22d01c47e645..6060be974771 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -54,7 +54,7 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea * @param obj object to broadcast * @param id A unique identifier for the broadcast variable. */ -private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) +private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, isExecutorSide: Boolean) extends Broadcast[T](id) with Logging with Serializable { /** @@ -84,8 +84,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) private val broadcastId = BroadcastBlockId(id) + def setNumBlocks(n: Int): Unit = { + numBlocks = n + } + + def getNumBlocks(): Int = numBlocks + /** Total number of blocks this broadcast variable contains. */ - private val numBlocks: Int = writeBlocks(obj) + private var numBlocks: Int = if (!isExecutorSide) writeBlocks(obj) else -1 /** Whether to generate checksum for blocks or not. */ private var checksumEnabled: Boolean = false @@ -132,8 +138,9 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) checksums(i) = calcChecksum(block) } val pieceId = BroadcastBlockId(id, "piece" + i) + blockManager.persistBroadcast(pieceId, block) val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + if (!blockManager.putBytes[T](pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") } } @@ -158,7 +165,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) blocks(pid) = block releaseLock(pieceId) case None => - bm.getRemoteBytes(pieceId) match { + bm.getRemoteBytes(pieceId).orElse(bm.getHdfsBytes(pieceId)) match { case Some(b) => if (checksumEnabled) { val sum = calcChecksum(b.chunks(0)) @@ -169,7 +176,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) } // We found the block from remote executors/driver's BlockManager, so put the block // in this executor's BlockManager. - if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { + if (!bm.putBytes[T]( + pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException( s"Failed to store $pieceId of $broadcastId in local BlockManager") } @@ -194,7 +202,11 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) * and driver. */ override protected def doDestroy(blocking: Boolean) { - TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking) + if (isExecutorSide) { + TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking) + } else { + TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking) + } } /** Used by the JVM when serializing this object. */ @@ -222,7 +234,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) val blocks = readBlocks().flatMap(_.getChunks()) logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - val obj = TorrentBroadcast.unBlockifyObject[T]( + val res = TorrentBroadcast.unBlockifyObject[T]( blocks, SparkEnv.get.serializer, compressionCodec) // Store the merged copy in BlockManager so other tasks on this executor don't // need to re-fetch it. @@ -230,7 +242,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long) if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } - obj + res } } } @@ -301,5 +313,6 @@ private object TorrentBroadcast extends Logging { def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = { logDebug(s"Unpersisting TorrentBroadcast $id") SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking) + SparkEnv.get.blockManager.cleanBroadcastPieces(id) } } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index b11f9ba171b8..1b0cebd11807 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -31,7 +31,15 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory { override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { } override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = { - new TorrentBroadcast[T](value_, id) + new TorrentBroadcast[T](value_, id, false) + } + + override def newBroadcast[T: ClassTag]( + value: T, + isLocal: Boolean, + id: Long, + isExecutorSide: Boolean): Broadcast[T] = { + new TorrentBroadcast[T](value, id, isExecutorSide) } override def stop() { } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 374abccf6ad5..3c347601a505 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -34,6 +34,7 @@ import org.apache.spark._ import org.apache.spark.Partitioner._ import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.api.java.JavaRDD +import org.apache.spark.broadcast.{Broadcast, TorrentBroadcast, TransFunc} import org.apache.spark.internal.Logging import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator @@ -42,8 +43,7 @@ import org.apache.spark.partial.PartialResult import org.apache.spark.storage.{RDDBlockId, StorageLevel} import org.apache.spark.util.{BoundedPriorityQueue, Utils} import org.apache.spark.util.collection.OpenHashMap -import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, - SamplingUtils} +import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, SamplingUtils} /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -937,6 +937,54 @@ abstract class RDD[T: ClassTag]( } /** + * Broadcast the rdd to the cluster from executor, returning a + * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. + * The variable will be sent to each cluster only once. + * + * User should pass in a translate function to compute the broadcast value from the rdd. + */ + def broadcast[U: ClassTag](transFunc: TransFunc[T, U]): Broadcast[U] = withScope { + val bc = if (partitions.size > 0) { + val id = sc.env.broadcastManager.newBroadcastId + // create broadcast from driver, do not write blocks in driver. + val res = SparkEnv.get.broadcastManager.newBroadcast( + transFunc.transform(Array.empty[T]), false, id, true) + + val numBlocks = coalesce(1).mapPartitions { iter => + // write blocks in executor. + val bc = SparkEnv.get.broadcastManager.newBroadcast( + transFunc.transform(iter.toArray), false, id) + val numBlocks = bc.asInstanceOf[TorrentBroadcast[U]].getNumBlocks() + Seq(numBlocks).iterator + }.collect().head + // set num blocks in driver side + res.asInstanceOf[TorrentBroadcast[U]].setNumBlocks(numBlocks) + val callSite = sc.getCallSite + logInfo("Created executor side broadcast " + res.id + " from " + callSite.shortForm) + res + } else { + val res = SparkEnv.get.broadcastManager.newBroadcast( + transFunc.transform(Array.empty[T]), sc.isLocal) + val callSite = sc.getCallSite + logInfo("Created broadcast " + res.id + " from " + callSite.shortForm) + res + } + + sc.cleaner.foreach(_.registerBroadcastForCleanup(bc)) + bc + } + + // executor-side broadcast api + def broadcast[U: ClassTag](f: Iterator[T] => U): Broadcast[U] = withScope { + val transFunc = new TransFunc[T, U] { + override def transform(rows: Array[T]): U = { + f(rows.toIterator) + } + } + broadcast(transFunc) + } + + /** * Return an iterator that contains all of the elements in this RDD. * * The iterator will consume as much memory as the largest partition in this RDD. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 04521c9159ea..2d7d1bf4525e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,7 +28,10 @@ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal +import org.apache.hadoop.fs.{PathFilter, Path, FileSystem} + import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, MemoryMode} @@ -75,6 +78,20 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + private[spark] var hdfsDir: Option[Path] = None + + /** + * Set the current working dir from the outside param. + * Note: this will have a ambiguous problem if we check the dir with test case: + * `run Spark in yarn-client mode` and `run Python application in yarn-client mode` and + * `run Python application in yarn-cluster mode` and `external shuffle service`. + * + * @param dir only in yarn-mode, dir will be well defined and must be the staging dir. + */ + private[spark] def initializeCurrentDir(dir: String): Unit = { + + } + val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = @@ -161,6 +178,9 @@ private[spark] class BlockManager( * service if configured. */ def initialize(appId: String): Unit = { + val dir = conf.get("spark.hdfs.dir", "/tmp/spark/" + appId) + hdfsDir = Option(new Path(dir)) + blockTransferService.init(this) shuffleClient.init(appId) @@ -198,6 +218,89 @@ private[spark] class BlockManager( logInfo(s"Initialized BlockManager: $blockManagerId") } + def persistBroadcast(id: BlockId, block: ByteBuffer): Unit = { + hdfsDir.foreach { dirPath => + val blockFile = id.toString + val filePath = new Path(dirPath, blockFile) + var fs: FileSystem = null + try { + fs = filePath.getFileSystem(SparkHadoopUtil.get.conf) + if (fs.exists(filePath)) { + logWarning(s"File(${filePath.getName})already exists.") + } + } catch { + case e: IOException => + logWarning("Error when list files in doing persistBroadcast", e) + return + } + try { + val outStream = fs.create(filePath) + outStream.write(block.array()) + outStream.hflush() + outStream.close() + logInfo(s"Store block: $blockFile into underlying fs.") + } catch { + case e: IOException => + logWarning("Error when writing file into file system and try to clean the file", e) + try { + fs.delete(filePath, true) + } catch { + case e: Exception => + logWarning(s"Failed to clean the broadcast file{$blockFile}.", e) + } + } + } + } + + def cleanBroadcastPieces(id: Long): Unit = { + hdfsDir.foreach { dirPath => + val fileFilter = new PathFilter { + override def accept(pathname: Path): Boolean = { + pathname.getName.startsWith(s"broadcast_${id}_piece") + } + } + try { + val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf) + val files = fs.listStatus(dirPath, fileFilter).map(_.getPath) + for (file <- files) { + if (fs.exists(file)) { + fs.delete(file, true) + logInfo(s"Underlying fs file: ${file.getName} has been clean.") + } + } + } catch { + case e: IOException => + logWarning(s"Failed to clean the broadcast file{${dirPath.toString}} in cleaning.", e) + } + } + } + + def getHdfsBytes(id: BlockId): Option[ChunkedByteBuffer] = { + try { + hdfsDir.map { dirPath => + val blockFile = id.toString + val filePath = new Path(dirPath, blockFile) + val fs = filePath.getFileSystem(SparkHadoopUtil.get.conf) + (filePath, fs) + }.filter { case(path, fs) => + fs.exists(path) + }.map { case (filePath, fs) => + val inputStream = fs.open(filePath) + val status = fs.getFileStatus(filePath) + val buffer = new Array[Byte](status.getLen.toInt) + inputStream.readFully(0, buffer) + inputStream.close() + logInfo(s"Got bytes from underling fs file: ${filePath.getName}.") + new ChunkedByteBuffer(ByteBuffer.wrap(buffer)) + } + } catch { + case e: Exception => + logError("Error in read the broadCast value from underlying fs ", e) + None + } + } + + private def registerWithExternalShuffleServer() { logInfo("Registering executor with local external shuffle service.") val shuffleConfig = new ExecutorShuffleInfo( diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index ad56715656c8..5cd89bbd767c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -22,14 +22,14 @@ import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.ClassTag - import com.esotericsoftware.kryo.KryoException import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.{FileSplit, TextInputFormat} - import org.apache.spark._ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.broadcast.TransFunc import org.apache.spark.rdd.RDDSuiteUtils._ +import org.apache.spark.storage.BroadcastBlockId import org.apache.spark.util.Utils class RDDSuite extends SparkFunSuite with SharedSparkContext { @@ -48,6 +48,50 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("executor broadcast") { + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val transFun = new TransFunc[Int, Int] { + override def transform(rows: Array[Int]): Int = { + if (rows.size > 0) rows.apply(0) else 10 + } + } + val b1 = nums.broadcast(transFun) + val b2 = nums.broadcast { iter => + if (iter.hasNext) iter.next() else 10 + } + assert(b1.value == 1) + assert(b2.value == 1) + } + + test("executor broadcast --- empty rdd") { + val empty = sc.makeRDD(Array.empty[Int], 2) + val transFun = new TransFunc[Int, Int] { + override def transform(rows: Array[Int]): Int = if (rows.size > 0) rows.apply(0) else 10 + } + val b1 = empty.broadcast(transFun) + assert(b1.value == 10) + val b2 = empty.broadcast { iter => + if (iter.hasNext) iter.next() else 10 + } + assert(b2.value == 10) + } + + test("executor broadcast --- broadcast data lost from executor") { + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val transFun = new TransFunc[Int, Int] { + override def transform(rows: Array[Int]): Int = if (rows.size > 0) rows.apply(0) else 10 + } + val b1 = nums.broadcast(transFun) + sc.env.blockManager.removeBroadcast(b1.id, false) + assert(b1.value == 1) + + val b2 = nums.broadcast{ iter => + if (iter.hasNext) iter.next() else 10 + } + sc.env.blockManager.removeBroadcast(b2.id, false) + assert(b2.value == 1) + } + test("basic operations") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.getNumPartitions === 2) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala index 9dfdf4da78ff..feaabf046c0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/broadcastMode.scala @@ -17,13 +17,14 @@ package org.apache.spark.sql.catalyst.plans.physical +import org.apache.spark.broadcast.TransFunc import org.apache.spark.sql.catalyst.InternalRow /** * Marker trait to identify the shape in which tuples are broadcasted. Typical examples of this are * identity (tuples remain unchanged) or hashed (tuples are converted into some hash index). */ -trait BroadcastMode { +trait BroadcastMode extends TransFunc[InternalRow, Any]{ def transform(rows: Array[InternalRow]): Any /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index 7be5d31d4a76..a8f83b7ac1bd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -38,13 +38,18 @@ import org.apache.spark.util.ThreadUtils */ case class BroadcastExchangeExec( mode: BroadcastMode, - child: SparkPlan) extends Exchange { + child: SparkPlan, + conf: SQLConf) extends Exchange { + + private def executorBroadcast: Boolean = conf.executorBroadcastEnabled override lazy val metrics = Map( "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"), "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)"), "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"), - "broadcastTime" -> SQLMetrics.createMetric(sparkContext, "time to broadcast (ms)")) + "broadcastTime" -> SQLMetrics.createMetric(sparkContext, "time to broadcast (ms)"), + "collect_build_broadcastTime" -> SQLMetrics.createMetric(sparkContext, + "time to collect, build and broadcast (ms) in executor broadcast")) override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) @@ -73,30 +78,41 @@ case class BroadcastExchangeExec( // with the correct execution. SQLExecution.withExecutionId(sparkContext, executionId) { try { - val beforeCollect = System.nanoTime() - // Note that we use .executeCollect() because we don't want to convert data to Scala types - val input: Array[InternalRow] = child.executeCollect() - if (input.length >= 512000000) { - throw new SparkException( - s"Cannot broadcast the table with more than 512 millions rows: ${input.length} rows") - } - val beforeBuild = System.nanoTime() - longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000 - val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum - longMetric("dataSize") += dataSize - if (dataSize >= (8L << 30)) { - throw new SparkException( - s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB") + val broadcasted = if (executorBroadcast) { + val before = System.nanoTime() + val res = + child.execute().mapPartitions { iter => + iter.map(_.copy()) + }.broadcast(mode) + longMetric("collect_build_broadcastTime") += (System.nanoTime() - before) / 1000000 + res + } else { + val beforeCollect = System.nanoTime() + // Note that we use .executeCollect() because we don't want to + // convert data to Scala types + val input: Array[InternalRow] = child.executeCollect() + if (input.length >= 512000000) { + throw new SparkException( + s"Cannot broadcast the table with more than" + + s"512 millions rows: ${input.length} rows") + } + val beforeBuild = System.nanoTime() + longMetric("collectTime") += (beforeBuild - beforeCollect) / 1000000 + val dataSize = input.map(_.asInstanceOf[UnsafeRow].getSizeInBytes.toLong).sum + longMetric("dataSize") += dataSize + if (dataSize >= (8L << 30)) { + throw new SparkException( + s"Cannot broadcast the table that is larger than 8GB: ${dataSize >> 30} GB") + } + // Construct and broadcast the relation. + val relation = mode.transform(input) + val beforeBroadcast = System.nanoTime() + longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000 + val res = sparkContext.broadcast(relation) + longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000 + res } - // Construct and broadcast the relation. - val relation = mode.transform(input) - val beforeBroadcast = System.nanoTime() - longMetric("buildTime") += (beforeBroadcast - beforeBuild) / 1000000 - - val broadcasted = sparkContext.broadcast(relation) - longMetric("broadcastTime") += (System.nanoTime() - beforeBroadcast) / 1000000 - // There are some cases we don't care about the metrics and call `SparkPlan.doExecute` // directly without setting an execution id. We should be tolerant to it. if (executionId != null) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index f17049949aa4..5f223af5bb79 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -160,7 +160,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child case (child, BroadcastDistribution(mode)) => - BroadcastExchangeExec(mode, child) + BroadcastExchangeExec(mode, child, conf) case (child, distribution) => ShuffleExchange(createPartitioning(distribution, defaultNumPreShufflePartitions), child) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 304dcb691b32..45a96c3f2f44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -154,6 +154,11 @@ object SQLConf { .booleanConf .createWithDefault(false) + val EXECUTOR_BROADCAST_N_ENABLED = SQLConfigBuilder("spark.sql.executorBroadcast.enabled") + .doc("When true, broadcast join use executor side broadcast.") + .booleanConf + .createWithDefault(true) + val SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS = SQLConfigBuilder("spark.sql.adaptive.minNumPostShufflePartitions") .internal() @@ -727,6 +732,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED) + def executorBroadcastEnabled: Boolean = getConf(EXECUTOR_BROADCAST_N_ENABLED) + def minNumPostShufflePartitions: Int = getConf(SHUFFLE_MIN_NUM_POSTSHUFFLE_PARTITIONS) From 31eaefb26576d1b4f87b69ca2c2a40a7666c81d6 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 11:31:20 +0800 Subject: [PATCH 02/13] renaming better --- .../apache/spark/broadcast/BroadcastFactory.scala | 5 ++--- .../apache/spark/broadcast/BroadcastManager.scala | 15 +++++++-------- .../apache/spark/broadcast/TorrentBroadcast.scala | 8 ++++---- .../spark/broadcast/TorrentBroadcastFactory.scala | 7 +++---- .../src/main/scala/org/apache/spark/rdd/RDD.scala | 7 ++++--- .../org/apache/spark/storage/BlockManager.scala | 14 +------------- .../scala/org/apache/spark/rdd/RDDSuite.scala | 3 ++- .../exchange/BroadcastExchangeExec.scala | 2 +- 8 files changed, 24 insertions(+), 37 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index c0911f6ea70e..2d92d72918c4 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -45,11 +45,10 @@ private[spark] trait BroadcastFactory { * is that there is a new param `isExecutorSide` to tell the BroadCast it is a executor-side * broadcast and should consider recovery when get block data failed. */ - def newBroadcast[T: ClassTag]( + def newExecutorBroadcast[T: ClassTag]( value: T, isLocal: Boolean, - id: Long, - isExecutorSide: Boolean): Broadcast[T] + id: Long): Broadcast[T] def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index 178274688e24..109ac4be8e07 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -69,12 +69,11 @@ private[spark] class BroadcastManager( } // Called from driver to create broadcast with specified id - def newBroadcast[T: ClassTag]( + def newExecutorBroadcast[T: ClassTag]( value_ : T, isLocal: Boolean, - id: Long, - isExecutorSide: Boolean): Broadcast[T] = { - broadcastFactory.newBroadcast[T](value_, isLocal, id, isExecutorSide) + id: Long): Broadcast[T] = { + broadcastFactory.newExecutorBroadcast[T](value_, isLocal, id) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { @@ -83,10 +82,10 @@ private[spark] class BroadcastManager( } /** - * Marker trait to identify the shape in which tuples are broadcasted. This is used for executor-side - * broadcast, typical examples of this are identity (tuples remain unchanged) or hashed (tuples are - * converted into some hash index). + * Marker trait to identify the shape in which tuples are broadcasted. This is used for + * executor-side broadcast, typical examples of this are identity (tuples remain unchanged) + * or hashed (tuples are converted into some hash index). */ trait TransFunc[T, U] extends Serializable { def transform(rows: Array[T]): U -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 6060be974771..e29aff92e1bc 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -140,7 +140,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, isExecutorS val pieceId = BroadcastBlockId(id, "piece" + i) blockManager.persistBroadcast(pieceId, block) val bytes = new ChunkedByteBuffer(block.duplicate()) - if (!blockManager.putBytes[T](pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { + if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") } } @@ -176,7 +176,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, isExecutorS } // We found the block from remote executors/driver's BlockManager, so put the block // in this executor's BlockManager. - if (!bm.putBytes[T]( + if (!bm.putBytes( pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException( s"Failed to store $pieceId of $broadcastId in local BlockManager") @@ -234,7 +234,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, isExecutorS val blocks = readBlocks().flatMap(_.getChunks()) logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs)) - val res = TorrentBroadcast.unBlockifyObject[T]( + val obj = TorrentBroadcast.unBlockifyObject[T]( blocks, SparkEnv.get.serializer, compressionCodec) // Store the merged copy in BlockManager so other tasks on this executor don't // need to re-fetch it. @@ -242,7 +242,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, isExecutorS if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) { throw new SparkException(s"Failed to store $broadcastId in BlockManager") } - res + obj } } } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index 1b0cebd11807..93fcbaee9875 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -34,12 +34,11 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory { new TorrentBroadcast[T](value_, id, false) } - override def newBroadcast[T: ClassTag]( + override def newExecutorBroadcast[T: ClassTag]( value: T, isLocal: Boolean, - id: Long, - isExecutorSide: Boolean): Broadcast[T] = { - new TorrentBroadcast[T](value, id, isExecutorSide) + id: Long): Broadcast[T] = { + new TorrentBroadcast[T](value, id, true) } override def stop() { } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3c347601a505..55a2687b485a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -946,9 +946,10 @@ abstract class RDD[T: ClassTag]( def broadcast[U: ClassTag](transFunc: TransFunc[T, U]): Broadcast[U] = withScope { val bc = if (partitions.size > 0) { val id = sc.env.broadcastManager.newBroadcastId - // create broadcast from driver, do not write blocks in driver. - val res = SparkEnv.get.broadcastManager.newBroadcast( - transFunc.transform(Array.empty[T]), false, id, true) + + // create broadcast from driver, do not write blocks in driver when construct. + val res = SparkEnv.get.broadcastManager.newExecutorBroadcast( + transFunc.transform(Array.empty[T]), false, id) val numBlocks = coalesce(1).mapPartitions { iter => // write blocks in executor. diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 2d7d1bf4525e..6fdc995ca5e6 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,7 +28,7 @@ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal -import org.apache.hadoop.fs.{PathFilter, Path, FileSystem} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -80,18 +80,6 @@ private[spark] class BlockManager( private[spark] var hdfsDir: Option[Path] = None - /** - * Set the current working dir from the outside param. - * Note: this will have a ambiguous problem if we check the dir with test case: - * `run Spark in yarn-client mode` and `run Python application in yarn-client mode` and - * `run Python application in yarn-cluster mode` and `external shuffle service`. - * - * @param dir only in yarn-mode, dir will be well defined and must be the staging dir. - */ - private[spark] def initializeCurrentDir(dir: String): Unit = { - - } - val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 5cd89bbd767c..b8975de95778 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -22,14 +22,15 @@ import java.io.{File, IOException, ObjectInputStream, ObjectOutputStream} import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.ClassTag + import com.esotericsoftware.kryo.KryoException import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapred.{FileSplit, TextInputFormat} + import org.apache.spark._ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.broadcast.TransFunc import org.apache.spark.rdd.RDDSuiteUtils._ -import org.apache.spark.storage.BroadcastBlockId import org.apache.spark.util.Utils class RDDSuite extends SparkFunSuite with SharedSparkContext { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala index a8f83b7ac1bd..a061dcfdbdfc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala @@ -49,7 +49,7 @@ case class BroadcastExchangeExec( "buildTime" -> SQLMetrics.createMetric(sparkContext, "time to build (ms)"), "broadcastTime" -> SQLMetrics.createMetric(sparkContext, "time to broadcast (ms)"), "collect_build_broadcastTime" -> SQLMetrics.createMetric(sparkContext, - "time to collect, build and broadcast (ms) in executor broadcast")) + "time to collect, build and broadcast (ms)")) override def outputPartitioning: Partitioning = BroadcastPartitioning(mode) From 9dbdd8c5b7ace83eef9516ab81375c18668fb73f Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 11:57:31 +0800 Subject: [PATCH 03/13] add finally --- .../apache/spark/storage/BlockManager.scala | 34 +++++++++++++++---- .../spark/sql/execution/ExchangeSuite.scala | 6 ++-- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6fdc995ca5e6..16ca95fdf056 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,7 +28,7 @@ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal -import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.hadoop.fs._ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -221,21 +221,34 @@ private[spark] class BlockManager( logWarning("Error when list files in doing persistBroadcast", e) return } + var shouldDeleteFile: Boolean = false + var outStream: FSDataOutputStream = null try { - val outStream = fs.create(filePath) + outStream = fs.create(filePath) outStream.write(block.array()) outStream.hflush() - outStream.close() logInfo(s"Store block: $blockFile into underlying fs.") } catch { case e: IOException => - logWarning("Error when writing file into file system and try to clean the file", e) + logWarning("Error when backing broadcast to hdfs and try to clean the file", e) + shouldDeleteFile = true + } finally { + if (null != outStream) { + try { + outStream.close() + } catch { + case e: Throwable => + logWarning("Can't close the output stream.", e) + } + } + if (shouldDeleteFile) { try { fs.delete(filePath, true) } catch { case e: Exception => logWarning(s"Failed to clean the broadcast file{$blockFile}.", e) } + } } } } @@ -264,6 +277,7 @@ private[spark] class BlockManager( } def getHdfsBytes(id: BlockId): Option[ChunkedByteBuffer] = { + var inputStream: FSDataInputStream = null try { hdfsDir.map { dirPath => val blockFile = id.toString @@ -273,11 +287,10 @@ private[spark] class BlockManager( }.filter { case(path, fs) => fs.exists(path) }.map { case (filePath, fs) => - val inputStream = fs.open(filePath) + inputStream = fs.open(filePath) val status = fs.getFileStatus(filePath) val buffer = new Array[Byte](status.getLen.toInt) inputStream.readFully(0, buffer) - inputStream.close() logInfo(s"Got bytes from underling fs file: ${filePath.getName}.") new ChunkedByteBuffer(ByteBuffer.wrap(buffer)) } @@ -285,6 +298,15 @@ private[spark] class BlockManager( case e: Exception => logError("Error in read the broadCast value from underlying fs ", e) None + } finally { + if (null != inputStream) { + try { + inputStream.close() + } catch { + case e: Throwable => + logWarning("Can't close the input stream.", e) + } + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala index 36cde3233dce..fb1e6fc50ce5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExchangeSuite.scala @@ -55,12 +55,12 @@ class ExchangeSuite extends SparkPlanTest with SharedSQLContext { val output = plan.output assert(plan sameResult plan) - val exchange1 = BroadcastExchangeExec(IdentityBroadcastMode, plan) + val exchange1 = BroadcastExchangeExec(IdentityBroadcastMode, plan, spark.sessionState.conf) val hashMode = HashedRelationBroadcastMode(output) - val exchange2 = BroadcastExchangeExec(hashMode, plan) + val exchange2 = BroadcastExchangeExec(hashMode, plan, spark.sessionState.conf) val hashMode2 = HashedRelationBroadcastMode(Alias(output.head, "id2")() :: Nil) - val exchange3 = BroadcastExchangeExec(hashMode2, plan) + val exchange3 = BroadcastExchangeExec(hashMode2, plan, spark.sessionState.conf) val exchange4 = ReusedExchangeExec(output, exchange3) assert(exchange1 sameResult exchange1) From 55c2b8ed986ab9e5cb65152dd93eaa2089fafaf5 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 15:19:08 +0800 Subject: [PATCH 04/13] improve the rdd broadcast api --- .../spark/broadcast/BroadcastFactory.scala | 9 +++++--- .../spark/broadcast/BroadcastManager.scala | 15 +++++++------ .../spark/broadcast/TorrentBroadcast.scala | 18 +++++++++------- .../broadcast/TorrentBroadcastFactory.scala | 12 +++++++---- .../main/scala/org/apache/spark/rdd/RDD.scala | 21 +++++++++---------- .../apache/spark/storage/BlockManager.scala | 2 +- 6 files changed, 42 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index 2d92d72918c4..b3dc467a523a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -41,14 +41,17 @@ private[spark] trait BroadcastFactory { def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T] /** - * create a new broadcast variable with a specified id. The different of the origin interface + * Creates a new broadcast variable with a specified id. The different of the origin interface * is that there is a new param `isExecutorSide` to tell the BroadCast it is a executor-side * broadcast and should consider recovery when get block data failed. */ def newExecutorBroadcast[T: ClassTag]( value: T, - isLocal: Boolean, - id: Long): Broadcast[T] + id: Long, + nBlocks: Int): Broadcast[T] + + // Called from executor to put broadcast data to blockmanager. + def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index 109ac4be8e07..12faec69d2c7 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -59,21 +59,20 @@ private[spark] class BroadcastManager( broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement()) } - // Called from executor to create broadcast with specified id - def newBroadcast[T: ClassTag]( + // Called from executor to upload broadcast data to blockmanager. + def uploadBroadcast[T: ClassTag]( value_ : T, - isLocal: Boolean, id: Long - ): Broadcast[T] = { - broadcastFactory.newBroadcast[T](value_, isLocal, id) + ): Int = { + broadcastFactory.uploadBroadcast[T](value_, id) } // Called from driver to create broadcast with specified id def newExecutorBroadcast[T: ClassTag]( value_ : T, - isLocal: Boolean, - id: Long): Broadcast[T] = { - broadcastFactory.newExecutorBroadcast[T](value_, isLocal, id) + id: Long, + nBlocks: Int): Broadcast[T] = { + broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index e29aff92e1bc..f7e92e30ed3f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -53,8 +53,14 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea * * @param obj object to broadcast * @param id A unique identifier for the broadcast variable. - */ -private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, isExecutorSide: Boolean) + * @param isExecutorSide A identifier for executor broadcast variable. + * @param nBlocks how many blocks for executor broadcast. + */ +private[spark] class TorrentBroadcast[T: ClassTag]( + obj: T, + id: Long, + isExecutorSide: Boolean = false, + nBlocks: Option[Int] = None) extends Broadcast[T](id) with Logging with Serializable { /** @@ -84,14 +90,10 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long, isExecutorS private val broadcastId = BroadcastBlockId(id) - def setNumBlocks(n: Int): Unit = { - numBlocks = n - } - - def getNumBlocks(): Int = numBlocks + def getNumBlocks: Int = numBlocks /** Total number of blocks this broadcast variable contains. */ - private var numBlocks: Int = if (!isExecutorSide) writeBlocks(obj) else -1 + private var numBlocks: Int = if (!isExecutorSide) writeBlocks(obj) else nBlocks.getOrElse(-1) /** Whether to generate checksum for blocks or not. */ private var checksumEnabled: Boolean = false diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index 93fcbaee9875..0e5494d08068 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -31,14 +31,18 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory { override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { } override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = { - new TorrentBroadcast[T](value_, id, false) + new TorrentBroadcast[T](value_, id) } override def newExecutorBroadcast[T: ClassTag]( value: T, - isLocal: Boolean, - id: Long): Broadcast[T] = { - new TorrentBroadcast[T](value, id, true) + id: Long, + nBlocks: Int): Broadcast[T] = { + new TorrentBroadcast[T](value, id, true, Option(nBlocks)) + } + + override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int = { + new TorrentBroadcast[T](value_, id).getNumBlocks } override def stop() { } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 55a2687b485a..99a2e6da7cee 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -947,23 +947,22 @@ abstract class RDD[T: ClassTag]( val bc = if (partitions.size > 0) { val id = sc.env.broadcastManager.newBroadcastId - // create broadcast from driver, do not write blocks in driver when construct. - val res = SparkEnv.get.broadcastManager.newExecutorBroadcast( - transFunc.transform(Array.empty[T]), false, id) - - val numBlocks = coalesce(1).mapPartitions { iter => - // write blocks in executor. - val bc = SparkEnv.get.broadcastManager.newBroadcast( - transFunc.transform(iter.toArray), false, id) - val numBlocks = bc.asInstanceOf[TorrentBroadcast[U]].getNumBlocks() + // first: write blocks to block manager from executor. + val nBlocks = coalesce(1).mapPartitions { iter => + val numBlocks = + SparkEnv.get.broadcastManager.uploadBroadcast(transFunc.transform(iter.toArray), id) Seq(numBlocks).iterator }.collect().head - // set num blocks in driver side - res.asInstanceOf[TorrentBroadcast[U]].setNumBlocks(numBlocks) + + // then: create broadcast from driver, this will not write blocks + val res = SparkEnv.get.broadcastManager.newExecutorBroadcast( + transFunc.transform(Array.empty[T]), id, nBlocks) + val callSite = sc.getCallSite logInfo("Created executor side broadcast " + res.id + " from " + callSite.shortForm) res } else { + // Rdd may have 0 partitions, for this case use driver broadcast. val res = SparkEnv.get.broadcastManager.newBroadcast( transFunc.transform(Array.empty[T]), sc.isLocal) val callSite = sc.getCallSite diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 16ca95fdf056..e060f9a2798b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -166,7 +166,7 @@ private[spark] class BlockManager( * service if configured. */ def initialize(appId: String): Unit = { - val dir = conf.get("spark.hdfs.dir", "/tmp/spark/" + appId) + val dir = conf.get("spark.hdfs.dir", s"/tmp/spark/${appId}_blocks") hdfsDir = Option(new Path(dir)) blockTransferService.init(this) From 8347ad63f344d48c8505342586cfc36be22477a8 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 16:05:44 +0800 Subject: [PATCH 05/13] rename improvement --- .../spark/broadcast/TorrentBroadcast.scala | 6 +- .../apache/spark/storage/BlockManager.scala | 134 +++--------------- .../spark/storage/DiskBlockManager.scala | 113 +++++++++++++++ 3 files changed, 137 insertions(+), 116 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index f7e92e30ed3f..b67573ba51ad 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -55,7 +55,7 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea * @param id A unique identifier for the broadcast variable. * @param isExecutorSide A identifier for executor broadcast variable. * @param nBlocks how many blocks for executor broadcast. - */ + */ private[spark] class TorrentBroadcast[T: ClassTag]( obj: T, id: Long, @@ -140,7 +140,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( checksums(i) = calcChecksum(block) } val pieceId = BroadcastBlockId(id, "piece" + i) - blockManager.persistBroadcast(pieceId, block) + blockManager.persistBroadcastPiece(pieceId, block) val bytes = new ChunkedByteBuffer(block.duplicate()) if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") @@ -167,7 +167,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( blocks(pid) = block releaseLock(pieceId) case None => - bm.getRemoteBytes(pieceId).orElse(bm.getHdfsBytes(pieceId)) match { + bm.getRemoteBytes(pieceId).orElse(bm.getBroadcastPiece(pieceId)) match { case Some(b) => if (checksumEnabled) { val sum = calcChecksum(b.chunks(0)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index e060f9a2798b..27acc5818475 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -28,10 +28,7 @@ import scala.reflect.ClassTag import scala.util.Random import scala.util.control.NonFatal -import org.apache.hadoop.fs._ - import org.apache.spark._ -import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics} import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, MemoryMode} @@ -78,8 +75,6 @@ private[spark] class BlockManager( private[spark] val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) - private[spark] var hdfsDir: Option[Path] = None - val diskBlockManager = { // Only perform cleanup if an external service is not serving our shuffle files. val deleteFilesOnStop = @@ -166,9 +161,6 @@ private[spark] class BlockManager( * service if configured. */ def initialize(appId: String): Unit = { - val dir = conf.get("spark.hdfs.dir", s"/tmp/spark/${appId}_blocks") - hdfsDir = Option(new Path(dir)) - blockTransferService.init(this) shuffleClient.init(appId) @@ -206,111 +198,6 @@ private[spark] class BlockManager( logInfo(s"Initialized BlockManager: $blockManagerId") } - def persistBroadcast(id: BlockId, block: ByteBuffer): Unit = { - hdfsDir.foreach { dirPath => - val blockFile = id.toString - val filePath = new Path(dirPath, blockFile) - var fs: FileSystem = null - try { - fs = filePath.getFileSystem(SparkHadoopUtil.get.conf) - if (fs.exists(filePath)) { - logWarning(s"File(${filePath.getName})already exists.") - } - } catch { - case e: IOException => - logWarning("Error when list files in doing persistBroadcast", e) - return - } - var shouldDeleteFile: Boolean = false - var outStream: FSDataOutputStream = null - try { - outStream = fs.create(filePath) - outStream.write(block.array()) - outStream.hflush() - logInfo(s"Store block: $blockFile into underlying fs.") - } catch { - case e: IOException => - logWarning("Error when backing broadcast to hdfs and try to clean the file", e) - shouldDeleteFile = true - } finally { - if (null != outStream) { - try { - outStream.close() - } catch { - case e: Throwable => - logWarning("Can't close the output stream.", e) - } - } - if (shouldDeleteFile) { - try { - fs.delete(filePath, true) - } catch { - case e: Exception => - logWarning(s"Failed to clean the broadcast file{$blockFile}.", e) - } - } - } - } - } - - def cleanBroadcastPieces(id: Long): Unit = { - hdfsDir.foreach { dirPath => - val fileFilter = new PathFilter { - override def accept(pathname: Path): Boolean = { - pathname.getName.startsWith(s"broadcast_${id}_piece") - } - } - try { - val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf) - val files = fs.listStatus(dirPath, fileFilter).map(_.getPath) - for (file <- files) { - if (fs.exists(file)) { - fs.delete(file, true) - logInfo(s"Underlying fs file: ${file.getName} has been clean.") - } - } - } catch { - case e: IOException => - logWarning(s"Failed to clean the broadcast file{${dirPath.toString}} in cleaning.", e) - } - } - } - - def getHdfsBytes(id: BlockId): Option[ChunkedByteBuffer] = { - var inputStream: FSDataInputStream = null - try { - hdfsDir.map { dirPath => - val blockFile = id.toString - val filePath = new Path(dirPath, blockFile) - val fs = filePath.getFileSystem(SparkHadoopUtil.get.conf) - (filePath, fs) - }.filter { case(path, fs) => - fs.exists(path) - }.map { case (filePath, fs) => - inputStream = fs.open(filePath) - val status = fs.getFileStatus(filePath) - val buffer = new Array[Byte](status.getLen.toInt) - inputStream.readFully(0, buffer) - logInfo(s"Got bytes from underling fs file: ${filePath.getName}.") - new ChunkedByteBuffer(ByteBuffer.wrap(buffer)) - } - } catch { - case e: Exception => - logError("Error in read the broadCast value from underlying fs ", e) - None - } finally { - if (null != inputStream) { - try { - inputStream.close() - } catch { - case e: Throwable => - logWarning("Can't close the input stream.", e) - } - } - } - } - - private def registerWithExternalShuffleServer() { logInfo("Registering executor with local external shuffle service.") val shuffleConfig = new ExecutorShuffleInfo( @@ -863,6 +750,27 @@ private[spark] class BlockManager( syncWrites, writeMetrics, blockId) } + /** + * Persist a broadcast piece of serialized bytes to the hdfs. + */ + def persistBroadcastPiece(id: BlockId, block: ByteBuffer): Unit = { + diskBlockManager.persistBroadcastPiece(id, block) + } + + /** + * Get broadcast block from the hdfs, as serialized bytes. + */ + def getBroadcastPiece(id: BlockId): Option[ChunkedByteBuffer] = { + diskBlockManager.getBroadcastPiece(id) + } + + /** + * Clean hdfs files for executor broadcast. + */ + def cleanBroadcastPieces(id: Long): Unit = { + diskBlockManager.cleanBroadcastPieces(id) + } + /** * Put a new block of serialized bytes to the block manager. * diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 3d43e3c367aa..9e30286b5ccd 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -18,12 +18,17 @@ package org.apache.spark.storage import java.io.{File, IOException} +import java.nio.ByteBuffer import java.util.UUID +import org.apache.hadoop.fs._ + import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.Logging import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.io.ChunkedByteBuffer /** * Creates and maintains the logical mapping between logical blocks and physical on-disk @@ -44,6 +49,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } + private[spark] lazy val hdfsDir = + Option(new Path(conf.get("spark.hdfs.dir", s"/tmp/spark/${conf.getAppId}_blocks"))) + // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)) @@ -140,6 +148,111 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } } + + def persistBroadcastPiece(id: BlockId, block: ByteBuffer): Unit = { + hdfsDir.foreach { dirPath => + val blockFile = id.toString + val filePath = new Path(dirPath, blockFile) + var fs: FileSystem = null + try { + fs = filePath.getFileSystem(SparkHadoopUtil.get.conf) + if (fs.exists(filePath)) { + logWarning(s"File(${filePath.getName})already exists.") + } + } catch { + case e: IOException => + logWarning("Error when list files in doing persistBroadcast", e) + return + } + var shouldDeleteFile: Boolean = false + var outStream: FSDataOutputStream = null + try { + outStream = fs.create(filePath) + outStream.write(block.array()) + outStream.hflush() + logInfo(s"Store block: $blockFile into underlying fs.") + } catch { + case e: IOException => + logWarning("Error when backing broadcast to hdfs and try to clean the file", e) + shouldDeleteFile = true + } finally { + if (null != outStream) { + try { + outStream.close() + } catch { + case e: Throwable => + logWarning("Can't close the output stream.", e) + } + } + if (shouldDeleteFile) { + try { + fs.delete(filePath, true) + } catch { + case e: Exception => + logWarning(s"Failed to clean the broadcast file{$blockFile}.", e) + } + } + } + } + } + + def cleanBroadcastPieces(id: Long): Unit = { + hdfsDir.foreach { dirPath => + val fileFilter = new PathFilter { + override def accept(pathname: Path): Boolean = { + pathname.getName.startsWith(s"broadcast_${id}_piece") + } + } + try { + val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf) + val files = fs.listStatus(dirPath, fileFilter).map(_.getPath) + for (file <- files) { + if (fs.exists(file)) { + fs.delete(file, true) + logInfo(s"Underlying fs file: ${file.getName} has been clean.") + } + } + } catch { + case e: IOException => + logWarning(s"Failed to clean the broadcast file{${dirPath.toString}} in cleaning.", e) + } + } + } + + def getBroadcastPiece(id: BlockId): Option[ChunkedByteBuffer] = { + var inputStream: FSDataInputStream = null + try { + hdfsDir.map { dirPath => + val blockFile = id.toString + val filePath = new Path(dirPath, blockFile) + val fs = filePath.getFileSystem(SparkHadoopUtil.get.conf) + (filePath, fs) + }.filter { case(path, fs) => + fs.exists(path) + }.map { case (filePath, fs) => + inputStream = fs.open(filePath) + val status = fs.getFileStatus(filePath) + val buffer = new Array[Byte](status.getLen.toInt) + inputStream.readFully(0, buffer) + logInfo(s"Got bytes from underling fs file: ${filePath.getName}.") + new ChunkedByteBuffer(ByteBuffer.wrap(buffer)) + } + } catch { + case e: Exception => + logError("Error in read the broadCast value from underlying fs ", e) + None + } finally { + if (null != inputStream) { + try { + inputStream.close() + } catch { + case e: Throwable => + logWarning("Can't close the input stream.", e) + } + } + } + } + private def addShutdownHook(): AnyRef = { logDebug("Adding shutdown hook") // force eager creation of logger ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () => From a8c7a54f699803407ce8a712da5212833e9eacb2 Mon Sep 17 00:00:00 2001 From: w00228970 Date: Wed, 28 Sep 2016 16:08:27 +0800 Subject: [PATCH 06/13] var => val --- .../scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index b67573ba51ad..ca9668f3e389 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -93,7 +93,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( def getNumBlocks: Int = numBlocks /** Total number of blocks this broadcast variable contains. */ - private var numBlocks: Int = if (!isExecutorSide) writeBlocks(obj) else nBlocks.getOrElse(-1) + private val numBlocks: Int = if (!isExecutorSide) writeBlocks(obj) else nBlocks.getOrElse(-1) /** Whether to generate checksum for blocks or not. */ private var checksumEnabled: Boolean = false From 41110f4e48c731e4e9b0bc520796948c7ff56f86 Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 29 Sep 2016 14:46:50 +0800 Subject: [PATCH 07/13] add shutdown hook to delete dir --- .../spark/broadcast/BroadcastManager.scala | 37 +++++++++++++++++++ .../spark/broadcast/TorrentBroadcast.scala | 14 +++++++ .../main/scala/org/apache/spark/rdd/RDD.scala | 12 +++++- .../spark/storage/DiskBlockManager.scala | 12 +++--- .../scala/org/apache/spark/rdd/RDDSuite.scala | 24 ++++++------ 5 files changed, 79 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index 12faec69d2c7..ef740da5a3f9 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -17,12 +17,17 @@ package org.apache.spark.broadcast +import java.io.IOException import java.util.concurrent.atomic.AtomicLong import scala.reflect.ClassTag +import org.apache.hadoop.fs.Path + import org.apache.spark.{SecurityManager, SparkConf} +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging +import org.apache.spark.util.ShutdownHookManager private[spark] class BroadcastManager( val isDriver: Boolean, @@ -32,6 +37,9 @@ private[spark] class BroadcastManager( private var initialized = false private var broadcastFactory: BroadcastFactory = null + private val shutdownHook = addShutdownHook() + private[spark] lazy val hdfsBackupDir = + Option(new Path(conf.get("spark.broadcast.backup.dir", s"/tmp/spark/${conf.getAppId}_blocks"))) initialize() @@ -47,6 +55,26 @@ private[spark] class BroadcastManager( } def stop() { + // Remove the shutdown hook. It causes memory leaks if we leave it around. + try { + ShutdownHookManager.removeShutdownHook(shutdownHook) + } catch { + case e: Exception => + logError(s"Exception while removing shutdown hook.", e) + } + // only delete the path from driver when the app stop. + if (isDriver) { + hdfsBackupDir.foreach { dirPath => + try { + val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf) + fs.delete(dirPath, true) + } catch { + case e: IOException => + logWarning(s"Failed to delete broadcast temp dir $dirPath.", e) + } + } + } + broadcastFactory.stop() } @@ -78,6 +106,15 @@ private[spark] class BroadcastManager( def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { broadcastFactory.unbroadcast(id, removeFromDriver, blocking) } + + private def addShutdownHook(): AnyRef = { + logDebug("Adding shutdown hook") // force eager creation of logger + ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY) { () => + logInfo("Shutdown hook called") + BroadcastManager.this.stop() + } + } + } /** diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index ca9668f3e389..e192302b1e9e 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -38,6 +38,7 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea * * The mechanism is as follows: * + * 1 For driver side broadcast(when isExecutorSide is false): * The driver divides the serialized object into small chunks and * stores those chunks in the BlockManager of the driver. * @@ -51,6 +52,19 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea * * When initialized, TorrentBroadcast objects read SparkEnv.get.conf. * + * 2 For executor side broadcast(when isExecutorSide is true): + * One executor divides the serialized object into small chunks and + * stores those chunks in the BlockManager of the executor. + * + * On other executors, the executor first attempts to fetch the object from its BlockManager. If + * it does not exist, it then uses remote fetches to fetch the small chunks from + * other executors if available. Once it gets the chunks, it puts the chunks in its own + * BlockManager, ready for other executors to fetch from. + * + * In executor side broadcast driver never holds the broadcast data. + * + * When initialized, TorrentBroadcast objects read SparkEnv.get.conf. + * * @param obj object to broadcast * @param id A unique identifier for the broadcast variable. * @param isExecutorSide A identifier for executor broadcast variable. diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 99a2e6da7cee..fbd2b36db8ba 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -943,7 +943,8 @@ abstract class RDD[T: ClassTag]( * * User should pass in a translate function to compute the broadcast value from the rdd. */ - def broadcast[U: ClassTag](transFunc: TransFunc[T, U]): Broadcast[U] = withScope { + @Since("2.1.0") + private[spark] def broadcast[U: ClassTag](transFunc: TransFunc[T, U]): Broadcast[U] = withScope { val bc = if (partitions.size > 0) { val id = sc.env.broadcastManager.newBroadcastId @@ -974,7 +975,14 @@ abstract class RDD[T: ClassTag]( bc } - // executor-side broadcast api + /** + * Executor broadcast api, it broadcast the rdd to the cluster from executor, returning a + * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. + * The variable will be sent to each cluster only once. + * + * @param f is a translate function to compute the broadcast value from the rdd. + */ + @Since("2.1.0") def broadcast[U: ClassTag](f: Iterator[T] => U): Broadcast[U] = withScope { val transFunc = new TransFunc[T, U] { override def transform(rows: Array[T]): U = { diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 9e30286b5ccd..a677fb59c190 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -23,7 +23,7 @@ import java.util.UUID import org.apache.hadoop.fs._ -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.internal.Logging @@ -49,8 +49,8 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) } - private[spark] lazy val hdfsDir = - Option(new Path(conf.get("spark.hdfs.dir", s"/tmp/spark/${conf.getAppId}_blocks"))) + + private[spark] lazy val broadcastBackupDir = SparkEnv.get.broadcastManager.hdfsBackupDir // The content of subDirs is immutable but the content of subDirs(i) is mutable. And the content // of subDirs(i) is protected by the lock of subDirs(i) @@ -150,7 +150,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea def persistBroadcastPiece(id: BlockId, block: ByteBuffer): Unit = { - hdfsDir.foreach { dirPath => + broadcastBackupDir.foreach { dirPath => val blockFile = id.toString val filePath = new Path(dirPath, blockFile) var fs: FileSystem = null @@ -197,7 +197,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } def cleanBroadcastPieces(id: Long): Unit = { - hdfsDir.foreach { dirPath => + broadcastBackupDir.foreach { dirPath => val fileFilter = new PathFilter { override def accept(pathname: Path): Boolean = { pathname.getName.startsWith(s"broadcast_${id}_piece") @@ -222,7 +222,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea def getBroadcastPiece(id: BlockId): Option[ChunkedByteBuffer] = { var inputStream: FSDataInputStream = null try { - hdfsDir.map { dirPath => + broadcastBackupDir.map { dirPath => val blockFile = id.toString val filePath = new Path(dirPath, blockFile) val fs = filePath.getFileSystem(SparkHadoopUtil.get.conf) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index b8975de95778..0f05a83e6c19 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -53,44 +53,44 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val transFun = new TransFunc[Int, Int] { override def transform(rows: Array[Int]): Int = { - if (rows.size > 0) rows.apply(0) else 10 + if (rows.size > 0) rows.reduce(_ + _) else 0 } } val b1 = nums.broadcast(transFun) val b2 = nums.broadcast { iter => - if (iter.hasNext) iter.next() else 10 + if (iter.hasNext) iter.reduce(_ + _) else 0 } - assert(b1.value == 1) - assert(b2.value == 1) + assert(b1.value == 10) + assert(b2.value == 10) } test("executor broadcast --- empty rdd") { val empty = sc.makeRDD(Array.empty[Int], 2) val transFun = new TransFunc[Int, Int] { - override def transform(rows: Array[Int]): Int = if (rows.size > 0) rows.apply(0) else 10 + override def transform(rows: Array[Int]): Int = if (rows.size > 0) rows.reduce(_ + _) else 0 } val b1 = empty.broadcast(transFun) - assert(b1.value == 10) + assert(b1.value == 0) val b2 = empty.broadcast { iter => - if (iter.hasNext) iter.next() else 10 + if (iter.hasNext) iter.reduce(_ + _) else 0 } - assert(b2.value == 10) + assert(b2.value == 0) } test("executor broadcast --- broadcast data lost from executor") { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val transFun = new TransFunc[Int, Int] { - override def transform(rows: Array[Int]): Int = if (rows.size > 0) rows.apply(0) else 10 + override def transform(rows: Array[Int]): Int = if (rows.size > 0) rows.reduce(_ + _) else 0 } val b1 = nums.broadcast(transFun) sc.env.blockManager.removeBroadcast(b1.id, false) - assert(b1.value == 1) + assert(b1.value == 10) val b2 = nums.broadcast{ iter => - if (iter.hasNext) iter.next() else 10 + if (iter.hasNext) iter.reduce(_ + _) else 0 } sc.env.blockManager.removeBroadcast(b2.id, false) - assert(b2.value == 1) + assert(b2.value == 10) } test("basic operations") { From 1bb8a44d1f58c30e041d73a24c19cf902296cd3a Mon Sep 17 00:00:00 2001 From: wangfei Date: Thu, 29 Sep 2016 17:19:45 +0800 Subject: [PATCH 08/13] fix dagschedulersuite --- .../scala/org/apache/spark/broadcast/BroadcastManager.scala | 4 +++- .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 ++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index ef740da5a3f9..f6d6d744ba2b 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -67,7 +67,9 @@ private[spark] class BroadcastManager( hdfsBackupDir.foreach { dirPath => try { val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf) - fs.delete(dirPath, true) + if (fs.exists(dirPath)) { + fs.delete(dirPath, true) + } } catch { case e: IOException => logWarning(s"Failed to delete broadcast temp dir $dirPath.", e) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 5e8a854e46a0..298c1d4b52a5 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -103,6 +103,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou import DAGSchedulerSuite._ val conf = new SparkConf + conf.set("spark.app.id", "DAGSchedulerSuite") + /** Set of TaskSets the DAGScheduler has requested executed. */ val taskSets = scala.collection.mutable.Buffer[TaskSet]() From cdab8854466fe816663b4fa1a981e0654c526658 Mon Sep 17 00:00:00 2001 From: wangfei Date: Fri, 30 Dec 2016 15:36:23 +0800 Subject: [PATCH 09/13] improve and fix compile error --- .../scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 6 ++++-- .../apache/spark/broadcast/TorrentBroadcastFactory.scala | 2 +- .../scala/org/apache/spark/storage/DiskBlockManager.scala | 3 +++ .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index e192302b1e9e..a8e21ef26dc9 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -107,7 +107,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( def getNumBlocks: Int = numBlocks /** Total number of blocks this broadcast variable contains. */ - private val numBlocks: Int = if (!isExecutorSide) writeBlocks(obj) else nBlocks.getOrElse(-1) + private val numBlocks: Int = nBlocks.getOrElse(writeBlocks(obj)) /** Whether to generate checksum for blocks or not. */ private var checksumEnabled: Boolean = false @@ -154,7 +154,9 @@ private[spark] class TorrentBroadcast[T: ClassTag]( checksums(i) = calcChecksum(block) } val pieceId = BroadcastBlockId(id, "piece" + i) - blockManager.persistBroadcastPiece(pieceId, block) + if (isExecutorSide) { + blockManager.persistBroadcastPiece(pieceId, block) + } val bytes = new ChunkedByteBuffer(block.duplicate()) if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) { throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager") diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index 0e5494d08068..627a13bab3dd 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -42,7 +42,7 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory { } override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int = { - new TorrentBroadcast[T](value_, id).getNumBlocks + new TorrentBroadcast[T](value_, id, true).getNumBlocks } override def stop() { } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index a677fb59c190..a07211ee599c 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -205,6 +205,9 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea } try { val fs = dirPath.getFileSystem(SparkHadoopUtil.get.conf) + if (!fs.exists(dirPath)) { + return + } val files = fs.listStatus(dirPath, fileFilter).map(_.getPath) for (file <- files) { if (fs.exists(file)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c27b815dfa08..c09d43c4c6ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1071,7 +1071,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { agg.queryExecution.executedPlan.collectFirst { case ShuffleExchange(_, _: RDDScanExec, _) => - case BroadcastExchangeExec(_, _: RDDScanExec) => + case BroadcastExchangeExec(_, _: RDDScanExec, _) => }.foreach { _ => fail( "No Exchange should be inserted above RDDScanExec since the checkpointed Dataset " + From 26b03b2e846509cfa00ca155288a93faae2576a3 Mon Sep 17 00:00:00 2001 From: wangfei Date: Tue, 3 Jan 2017 16:08:11 +0800 Subject: [PATCH 10/13] checkSum leads to NPE --- .../spark/broadcast/BroadcastFactory.scala | 5 +++-- .../spark/broadcast/BroadcastManager.scala | 7 ++++--- .../spark/broadcast/TorrentBroadcast.scala | 21 ++++++++++++------- .../broadcast/TorrentBroadcastFactory.scala | 10 +++++---- .../main/scala/org/apache/spark/rdd/RDD.scala | 14 +++++++------ 5 files changed, 34 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index b3dc467a523a..f4054655fa9c 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -48,10 +48,11 @@ private[spark] trait BroadcastFactory { def newExecutorBroadcast[T: ClassTag]( value: T, id: Long, - nBlocks: Int): Broadcast[T] + nBlocks: Int, + cSums: Array[Int]): Broadcast[T] // Called from executor to put broadcast data to blockmanager. - def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int + def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int] def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index f6d6d744ba2b..42b6a4a33b9a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -93,7 +93,7 @@ private[spark] class BroadcastManager( def uploadBroadcast[T: ClassTag]( value_ : T, id: Long - ): Int = { + ): Seq[Int] = { broadcastFactory.uploadBroadcast[T](value_, id) } @@ -101,8 +101,9 @@ private[spark] class BroadcastManager( def newExecutorBroadcast[T: ClassTag]( value_ : T, id: Long, - nBlocks: Int): Broadcast[T] = { - broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks) + nBlocks: Int, + cSums: Array[Int]): Broadcast[T] = { + broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks, cSums) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index a8e21ef26dc9..0497df5b9766 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag]( obj: T, id: Long, isExecutorSide: Boolean = false, - nBlocks: Option[Int] = None) + nBlocks: Option[Int] = None, + cSums: Option[Array[Int]] = None) extends Broadcast[T](id) with Logging with Serializable { /** @@ -90,6 +91,11 @@ private[spark] class TorrentBroadcast[T: ClassTag]( /** Size of each block. Default value is 4MB. This value is only read by the broadcaster. */ @transient private var blockSize: Int = _ + /** Whether to generate checksum for blocks or not. */ + private var checksumEnabled: Boolean = false + /** The checksum for all the blocks. */ + private var checksums: Array[Int] = cSums.getOrElse(null) + private def setConf(conf: SparkConf) { compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) { Some(CompressionCodec.createCodec(conf)) @@ -104,15 +110,14 @@ private[spark] class TorrentBroadcast[T: ClassTag]( private val broadcastId = BroadcastBlockId(id) - def getNumBlocks: Int = numBlocks + def getNumBlocksAndChecksums: Seq[Int] = if (checksumEnabled) { + Seq(numBlocks) ++ checksums + } else { + Seq(numBlocks) + } /** Total number of blocks this broadcast variable contains. */ - private val numBlocks: Int = nBlocks.getOrElse(writeBlocks(obj)) - - /** Whether to generate checksum for blocks or not. */ - private var checksumEnabled: Boolean = false - /** The checksum for all the blocks. */ - private var checksums: Array[Int] = _ + private val numBlocks: Int = nBlocks.getOrElse(writeBlocks(obj)) // this must be after checkSums override protected def getValue() = { _value diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index 627a13bab3dd..1bb2edb668a4 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -37,12 +37,14 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory { override def newExecutorBroadcast[T: ClassTag]( value: T, id: Long, - nBlocks: Int): Broadcast[T] = { - new TorrentBroadcast[T](value, id, true, Option(nBlocks)) + nBlocks: Int, + cSums: Array[Int]): Broadcast[T] = { + new TorrentBroadcast[T](value, id, true, Option(nBlocks), Option(cSums)) } - override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Int = { - new TorrentBroadcast[T](value_, id, true).getNumBlocks + override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int] = { + val executorBroadcast = new TorrentBroadcast[T](value_, id, true) + executorBroadcast.getNumBlocksAndChecksums } override def stop() { } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index fbd2b36db8ba..a9cd7e4ab4f2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -949,15 +949,17 @@ abstract class RDD[T: ClassTag]( val id = sc.env.broadcastManager.newBroadcastId // first: write blocks to block manager from executor. - val nBlocks = coalesce(1).mapPartitions { iter => - val numBlocks = - SparkEnv.get.broadcastManager.uploadBroadcast(transFunc.transform(iter.toArray), id) - Seq(numBlocks).iterator - }.collect().head + val numBlocksAndChecksums = coalesce(1).mapPartitions { iter => + SparkEnv.get.broadcastManager + .uploadBroadcast(transFunc.transform(iter.toArray), id).iterator + }.collect() // then: create broadcast from driver, this will not write blocks val res = SparkEnv.get.broadcastManager.newExecutorBroadcast( - transFunc.transform(Array.empty[T]), id, nBlocks) + transFunc.transform(Array.empty[T]), + id, + numBlocksAndChecksums.head, + numBlocksAndChecksums.tail) val callSite = sc.getCallSite logInfo("Created executor side broadcast " + res.id + " from " + callSite.shortForm) From 76dfc20b48f9e65e41bb79492629a75e833e732e Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 4 Jan 2017 16:38:23 +0800 Subject: [PATCH 11/13] clean check sum --- .../org/apache/spark/broadcast/BroadcastFactory.scala | 3 +-- .../org/apache/spark/broadcast/BroadcastManager.scala | 5 ++--- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 7 +++---- .../apache/spark/broadcast/TorrentBroadcastFactory.scala | 5 ++--- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- 5 files changed, 10 insertions(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index f4054655fa9c..070eba479c98 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -48,8 +48,7 @@ private[spark] trait BroadcastFactory { def newExecutorBroadcast[T: ClassTag]( value: T, id: Long, - nBlocks: Int, - cSums: Array[Int]): Broadcast[T] + nBlocks: Int): Broadcast[T] // Called from executor to put broadcast data to blockmanager. def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int] diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index 42b6a4a33b9a..df9f7ee5e358 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -101,9 +101,8 @@ private[spark] class BroadcastManager( def newExecutorBroadcast[T: ClassTag]( value_ : T, id: Long, - nBlocks: Int, - cSums: Array[Int]): Broadcast[T] = { - broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks, cSums) + nBlocks: Int): Broadcast[T] = { + broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 0497df5b9766..2464f315bad6 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -74,8 +74,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( obj: T, id: Long, isExecutorSide: Boolean = false, - nBlocks: Option[Int] = None, - cSums: Option[Array[Int]] = None) + @transient val nBlocks: Option[Int] = None) extends Broadcast[T](id) with Logging with Serializable { /** @@ -94,7 +93,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( /** Whether to generate checksum for blocks or not. */ private var checksumEnabled: Boolean = false /** The checksum for all the blocks. */ - private var checksums: Array[Int] = cSums.getOrElse(null) + private var checksums: Array[Int] = _ private def setConf(conf: SparkConf) { compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) { @@ -104,7 +103,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( } // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024 - checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true) + checksumEnabled = conf.getBoolean("spark.broadcast.checksum", false) } setConf(SparkEnv.get.conf) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index 1bb2edb668a4..1349ea773ef3 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -37,9 +37,8 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory { override def newExecutorBroadcast[T: ClassTag]( value: T, id: Long, - nBlocks: Int, - cSums: Array[Int]): Broadcast[T] = { - new TorrentBroadcast[T](value, id, true, Option(nBlocks), Option(cSums)) + nBlocks: Int): Broadcast[T] = { + new TorrentBroadcast[T](value, id, true, Option(nBlocks)) } override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index a9cd7e4ab4f2..d61bdc2187cd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -953,13 +953,13 @@ abstract class RDD[T: ClassTag]( SparkEnv.get.broadcastManager .uploadBroadcast(transFunc.transform(iter.toArray), id).iterator }.collect() + val nblocks = numBlocksAndChecksums.head // then: create broadcast from driver, this will not write blocks val res = SparkEnv.get.broadcastManager.newExecutorBroadcast( transFunc.transform(Array.empty[T]), id, - numBlocksAndChecksums.head, - numBlocksAndChecksums.tail) + nblocks) val callSite = sc.getCallSite logInfo("Created executor side broadcast " + res.id + " from " + callSite.shortForm) From 482c0c84e41fa6134e372589e571f22d3cfecd07 Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 4 Jan 2017 16:38:36 +0800 Subject: [PATCH 12/13] Revert "clean check sum" This reverts commit 76dfc20b48f9e65e41bb79492629a75e833e732e. --- .../org/apache/spark/broadcast/BroadcastFactory.scala | 3 ++- .../org/apache/spark/broadcast/BroadcastManager.scala | 5 +++-- .../org/apache/spark/broadcast/TorrentBroadcast.scala | 7 ++++--- .../apache/spark/broadcast/TorrentBroadcastFactory.scala | 5 +++-- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 4 ++-- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala index 070eba479c98..f4054655fa9c 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala @@ -48,7 +48,8 @@ private[spark] trait BroadcastFactory { def newExecutorBroadcast[T: ClassTag]( value: T, id: Long, - nBlocks: Int): Broadcast[T] + nBlocks: Int, + cSums: Array[Int]): Broadcast[T] // Called from executor to put broadcast data to blockmanager. def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int] diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala index df9f7ee5e358..42b6a4a33b9a 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastManager.scala @@ -101,8 +101,9 @@ private[spark] class BroadcastManager( def newExecutorBroadcast[T: ClassTag]( value_ : T, id: Long, - nBlocks: Int): Broadcast[T] = { - broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks) + nBlocks: Int, + cSums: Array[Int]): Broadcast[T] = { + broadcastFactory.newExecutorBroadcast[T](value_, id, nBlocks, cSums) } def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) { diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 2464f315bad6..0497df5b9766 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -74,7 +74,8 @@ private[spark] class TorrentBroadcast[T: ClassTag]( obj: T, id: Long, isExecutorSide: Boolean = false, - @transient val nBlocks: Option[Int] = None) + nBlocks: Option[Int] = None, + cSums: Option[Array[Int]] = None) extends Broadcast[T](id) with Logging with Serializable { /** @@ -93,7 +94,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( /** Whether to generate checksum for blocks or not. */ private var checksumEnabled: Boolean = false /** The checksum for all the blocks. */ - private var checksums: Array[Int] = _ + private var checksums: Array[Int] = cSums.getOrElse(null) private def setConf(conf: SparkConf) { compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) { @@ -103,7 +104,7 @@ private[spark] class TorrentBroadcast[T: ClassTag]( } // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024 - checksumEnabled = conf.getBoolean("spark.broadcast.checksum", false) + checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true) } setConf(SparkEnv.get.conf) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala index 1349ea773ef3..1bb2edb668a4 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcastFactory.scala @@ -37,8 +37,9 @@ private[spark] class TorrentBroadcastFactory extends BroadcastFactory { override def newExecutorBroadcast[T: ClassTag]( value: T, id: Long, - nBlocks: Int): Broadcast[T] = { - new TorrentBroadcast[T](value, id, true, Option(nBlocks)) + nBlocks: Int, + cSums: Array[Int]): Broadcast[T] = { + new TorrentBroadcast[T](value, id, true, Option(nBlocks), Option(cSums)) } override def uploadBroadcast[T: ClassTag](value_ : T, id: Long): Seq[Int] = { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d61bdc2187cd..a9cd7e4ab4f2 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -953,13 +953,13 @@ abstract class RDD[T: ClassTag]( SparkEnv.get.broadcastManager .uploadBroadcast(transFunc.transform(iter.toArray), id).iterator }.collect() - val nblocks = numBlocksAndChecksums.head // then: create broadcast from driver, this will not write blocks val res = SparkEnv.get.broadcastManager.newExecutorBroadcast( transFunc.transform(Array.empty[T]), id, - nblocks) + numBlocksAndChecksums.head, + numBlocksAndChecksums.tail) val callSite = sc.getCallSite logInfo("Created executor side broadcast " + res.id + " from " + callSite.shortForm) From 4a547a63913f95e990f0f16bf6ce559a25e3b5f7 Mon Sep 17 00:00:00 2001 From: wangfei Date: Wed, 4 Jan 2017 17:47:33 +0800 Subject: [PATCH 13/13] added transient for obj, option leads to obj as a field of TorrentBroadcast --- .../scala/org/apache/spark/broadcast/TorrentBroadcast.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 0497df5b9766..ce52b8efa533 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -71,7 +71,7 @@ import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStrea * @param nBlocks how many blocks for executor broadcast. */ private[spark] class TorrentBroadcast[T: ClassTag]( - obj: T, + @transient val obj: T, id: Long, isExecutorSide: Boolean = false, nBlocks: Option[Int] = None,