From 840b3cec3383fb8a1943c863f4db313d694f8922 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Sun, 20 Jul 2014 22:30:59 -0700 Subject: [PATCH 1/3] SPARK-2582. Make Block Manager Master pluggable. This patch makes the BlockManagerMaster a trait and makes the current BlockManagerMaster one of the possible implementations and renames it to StandaloneBlockManagerMaster. An additional (as yet undocumented) configuration parameter is added which can be used to set the BlockManagerMaster type to use. At some point, when we add BlockManagerMasters which write metadata to HDFS or replicate, we can add other possible values which will use other implementations. There is no change in current behavior. We must also enforce other implementations to use the current Akka actor itself, so the code in the BlockManager does not need to care what implementation is used on the BMM side. I am not sure how to enforce this. This is not too much of a concern as we don't have to make it pluggable - so the only options would be part of Spark - so this should be fairly easy to enforce. --- .../scala/org/apache/spark/SparkEnv.scala | 12 +- .../spark/storage/BlockManagerMaster.scala | 173 ++---------- .../StandaloneBlockManagerMaster.scala | 256 ++++++++++++++++++ .../apache/spark/storage/ThreadingTest.scala | 2 +- .../spark/broadcast/BroadcastSuite.scala | 18 +- .../spark/scheduler/DAGSchedulerSuite.scala | 4 +- .../spark/storage/BlockManagerSuite.scala | 2 +- 7 files changed, 297 insertions(+), 170 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 8f70744d804d..a82a0e53ebc4 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -215,9 +215,15 @@ object SparkEnv extends Logging { "MapOutputTracker", new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) - val blockManagerMaster = new BlockManagerMaster(registerOrLookup( - "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + val blockManagerMasterType = conf.get("spark.blockmanager.type", "standalone") + var blockManagerMaster: BlockManagerMaster = null + blockManagerMasterType match { + case _ => { // Since currently only one option exists, this is what is to be done in any case. + blockManagerMaster = new StandaloneBlockManagerMaster(registerOrLookup( + "BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + } + } + val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager, mapOutputTracker) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 7897fade2df2..9957752eee7a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.storage import scala.concurrent.{Await, Future} @@ -28,35 +27,20 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { - val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) - val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) - - val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" - - val timeout = AkkaUtils.askTimeout(conf) +trait BlockManagerMaster extends Logging { /** Remove a dead executor from the driver actor. This is only called on the driver side. */ - def removeExecutor(execId: String) { - tell(RemoveExecutor(execId)) - logInfo("Removed " + execId + " successfully in removeExecutor") - } + def removeExecutor(execId: String) /** * Send the driver actor a heart beat from the slave. Returns true if everything works out, * false if the driver does not know about the given block manager, which means the block * manager should re-register. */ - def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { - askDriverWithReply[Boolean](HeartBeat(blockManagerId)) - } + def sendHeartBeat(blockManagerId: BlockManagerId): Boolean /** Register the BlockManager's id with the driver. */ - def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { - logInfo("Trying to register BlockManager") - tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) - logInfo("Registered BlockManager") - } + def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) def updateBlockInfo( blockManagerId: BlockManagerId, @@ -64,84 +48,37 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long): Boolean = { - val res = askDriverWithReply[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) - logInfo("Updated info of block " + blockId) - res - } + tachyonSize: Long): Boolean /** Get locations of the blockId from the driver */ - def getLocations(blockId: BlockId): Seq[BlockManagerId] = { - askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId)) - } + def getLocations(blockId: BlockId): Seq[BlockManagerId] /** Get locations of multiple blockIds from the driver */ - def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { - askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) - } + def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] /** * Check if block manager master has a block. Note that this can be used to check for only * those blocks that are reported to block manager master. */ - def contains(blockId: BlockId) = { - !getLocations(blockId).isEmpty - } + def contains(blockId: BlockId) /** Get ids of other nodes in the cluster from the driver */ - def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { - val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) - if (result.length != numPeers) { - throw new SparkException( - "Error getting peers, only got " + result.size + " instead of " + numPeers) - } - result - } + def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] /** * Remove a block from the slaves that have it. This can only be used to remove * blocks that the driver knows about. */ - def removeBlock(blockId: BlockId) { - askDriverWithReply(RemoveBlock(blockId)) - } + def removeBlock(blockId: BlockId) /** Remove all blocks belonging to the given RDD. */ - def removeRdd(rddId: Int, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) - future.onFailure { - case e: Throwable => logError("Failed to remove RDD " + rddId, e) - } - if (blocking) { - Await.result(future, timeout) - } - } + def removeRdd(rddId: Int, blocking: Boolean) /** Remove all blocks belonging to the given shuffle. */ - def removeShuffle(shuffleId: Int, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) - future.onFailure { - case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e) - } - if (blocking) { - Await.result(future, timeout) - } - } + def removeShuffle(shuffleId: Int, blocking: Boolean) /** Remove all blocks belonging to the given broadcast. */ - def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Int]]]( - RemoveBroadcast(broadcastId, removeFromMaster)) - future.onFailure { - case e: Throwable => - logError("Failed to remove broadcast " + broadcastId + - " with removeFromMaster = " + removeFromMaster, e) - } - if (blocking) { - Await.result(future, timeout) - } - } + def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) /** * Return the memory status for each block manager, in the form of a map from @@ -149,13 +86,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log * amount of memory allocated for the block manager, while the second is the * amount of remaining memory. */ - def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { - askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) - } + def getMemoryStatus: Map[BlockManagerId, (Long, Long)] - def getStorageStatus: Array[StorageStatus] = { - askDriverWithReply[Array[StorageStatus]](GetStorageStatus) - } + def getStorageStatus: Array[StorageStatus] /** * Return the block's status on all block managers, if any. NOTE: This is a @@ -166,25 +99,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log * by all block managers. */ def getBlockStatus( - blockId: BlockId, - askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { - val msg = GetBlockStatus(blockId, askSlaves) - /* - * To avoid potential deadlocks, the use of Futures is necessary, because the master actor - * should not block on waiting for a block manager, which can in turn be waiting for the - * master actor for a response to a prior message. - */ - val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) - val (blockManagerIds, futures) = response.unzip - val result = Await.result(Future.sequence(futures), timeout) - if (result == null) { - throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) - } - val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]] - blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => - status.map { s => (blockManagerId, s) } - }.toMap - } + blockId: BlockId, + askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] /** * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This @@ -196,60 +112,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log */ def getMatchingBlockIds( filter: BlockId => Boolean, - askSlaves: Boolean): Seq[BlockId] = { - val msg = GetMatchingBlockIds(filter, askSlaves) - val future = askDriverWithReply[Future[Seq[BlockId]]](msg) - Await.result(future, timeout) - } + askSlaves: Boolean): Seq[BlockId] /** Stop the driver actor, called only on the Spark driver node */ - def stop() { - if (driverActor != null) { - tell(StopBlockManagerMaster) - driverActor = null - logInfo("BlockManagerMaster stopped") - } - } - - /** Send a one-way message to the master actor, to which we expect it to reply with true. */ - private def tell(message: Any) { - if (!askDriverWithReply[Boolean](message)) { - throw new SparkException("BlockManagerMasterActor returned false, expected true.") - } - } - - /** - * Send a message to the driver actor and get its result within a default timeout, or - * throw a SparkException if this fails. - */ - private def askDriverWithReply[T](message: Any): T = { - // TODO: Consider removing multiple attempts - if (driverActor == null) { - throw new SparkException("Error sending message to BlockManager as driverActor is null " + - "[message = " + message + "]") - } - var attempts = 0 - var lastException: Exception = null - while (attempts < AKKA_RETRY_ATTEMPTS) { - attempts += 1 - try { - val future = driverActor.ask(message)(timeout) - val result = Await.result(future, timeout) - if (result == null) { - throw new SparkException("BlockManagerMaster returned null") - } - return result.asInstanceOf[T] - } catch { - case ie: InterruptedException => throw ie - case e: Exception => - lastException = e - logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e) - } - Thread.sleep(AKKA_RETRY_INTERVAL_MS) - } - - throw new SparkException( - "Error sending message to BlockManagerMaster [message = " + message + "]", lastException) - } + def stop() } diff --git a/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala new file mode 100644 index 000000000000..65c9b054cc17 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global + +import akka.actor._ +import akka.pattern.ask + +import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.AkkaUtils + +private[spark] +class StandaloneBlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging +with BlockManagerMaster { + val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) + val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) + + val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" + + val timeout = AkkaUtils.askTimeout(conf) + + /** Remove a dead executor from the driver actor. This is only called on the driver side. */ + def removeExecutor(execId: String) { + tell(RemoveExecutor(execId)) + logInfo("Removed " + execId + " successfully in removeExecutor") + } + + /** + * Send the driver actor a heart beat from the slave. Returns true if everything works out, + * false if the driver does not know about the given block manager, which means the block + * manager should re-register. + */ + def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { + askDriverWithReply[Boolean](HeartBeat(blockManagerId)) + } + + /** Register the BlockManager's id with the driver. */ + def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + logInfo("Trying to register BlockManager") + tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) + logInfo("Registered BlockManager") + } + + def updateBlockInfo( + blockManagerId: BlockManagerId, + blockId: BlockId, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + tachyonSize: Long): Boolean = { + val res = askDriverWithReply[Boolean]( + UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) + logInfo("Updated info of block " + blockId) + res + } + + /** Get locations of the blockId from the driver */ + def getLocations(blockId: BlockId): Seq[BlockManagerId] = { + askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId)) + } + + /** Get locations of multiple blockIds from the driver */ + def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { + askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) + } + + /** + * Check if block manager master has a block. Note that this can be used to check for only + * those blocks that are reported to block manager master. + */ + def contains(blockId: BlockId) = { + !getLocations(blockId).isEmpty + } + + /** Get ids of other nodes in the cluster from the driver */ + def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { + val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) + if (result.length != numPeers) { + throw new SparkException( + "Error getting peers, only got " + result.size + " instead of " + numPeers) + } + result + } + + /** + * Remove a block from the slaves that have it. This can only be used to remove + * blocks that the driver knows about. + */ + def removeBlock(blockId: BlockId) { + askDriverWithReply(RemoveBlock(blockId)) + } + + /** Remove all blocks belonging to the given RDD. */ + def removeRdd(rddId: Int, blocking: Boolean) { + val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) + future.onFailure { + case e: Throwable => logError("Failed to remove RDD " + rddId, e) + } + if (blocking) { + Await.result(future, timeout) + } + } + + /** Remove all blocks belonging to the given shuffle. */ + def removeShuffle(shuffleId: Int, blocking: Boolean) { + val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) + future.onFailure { + case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e) + } + if (blocking) { + Await.result(future, timeout) + } + } + + /** Remove all blocks belonging to the given broadcast. */ + def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { + val future = askDriverWithReply[Future[Seq[Int]]]( + RemoveBroadcast(broadcastId, removeFromMaster)) + future.onFailure { + case e: Throwable => + logError("Failed to remove broadcast " + broadcastId + + " with removeFromMaster = " + removeFromMaster, e) + } + if (blocking) { + Await.result(future, timeout) + } + } + + /** + * Return the memory status for each block manager, in the form of a map from + * the block manager's id to two long values. The first value is the maximum + * amount of memory allocated for the block manager, while the second is the + * amount of remaining memory. + */ + def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { + askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) + } + + def getStorageStatus: Array[StorageStatus] = { + askDriverWithReply[Array[StorageStatus]](GetStorageStatus) + } + + /** + * Return the block's status on all block managers, if any. NOTE: This is a + * potentially expensive operation and should only be used for testing. + * + * If askSlaves is true, this invokes the master to query each block manager for the most + * updated block statuses. This is useful when the master is not informed of the given block + * by all block managers. + */ + def getBlockStatus( + blockId: BlockId, + askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { + val msg = GetBlockStatus(blockId, askSlaves) + /* + * To avoid potential deadlocks, the use of Futures is necessary, because the master actor + * should not block on waiting for a block manager, which can in turn be waiting for the + * master actor for a response to a prior message. + */ + val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) + val (blockManagerIds, futures) = response.unzip + val result = Await.result(Future.sequence(futures), timeout) + if (result == null) { + throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) + } + val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]] + blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => + status.map { s => (blockManagerId, s) } + }.toMap + } + + /** + * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This + * is a potentially expensive operation and should only be used for testing. + * + * If askSlaves is true, this invokes the master to query each block manager for the most + * updated block statuses. This is useful when the master is not informed of the given block + * by all block managers. + */ + def getMatchingBlockIds( + filter: BlockId => Boolean, + askSlaves: Boolean): Seq[BlockId] = { + val msg = GetMatchingBlockIds(filter, askSlaves) + val future = askDriverWithReply[Future[Seq[BlockId]]](msg) + Await.result(future, timeout) + } + + /** Stop the driver actor, called only on the Spark driver node */ + def stop() { + if (driverActor != null) { + tell(StopBlockManagerMaster) + driverActor = null + logInfo("BlockManagerMaster stopped") + } + } + + /** Send a one-way message to the master actor, to which we expect it to reply with true. */ + private def tell(message: Any) { + if (!askDriverWithReply[Boolean](message)) { + throw new SparkException("BlockManagerMasterActor returned false, expected true.") + } + } + + /** + * Send a message to the driver actor and get its result within a default timeout, or + * throw a SparkException if this fails. + */ + private def askDriverWithReply[T](message: Any): T = { + // TODO: Consider removing multiple attempts + if (driverActor == null) { + throw new SparkException("Error sending message to BlockManager as driverActor is null " + + "[message = " + message + "]") + } + var attempts = 0 + var lastException: Exception = null + while (attempts < AKKA_RETRY_ATTEMPTS) { + attempts += 1 + try { + val future = driverActor.ask(message)(timeout) + val result = Await.result(future, timeout) + if (result == null) { + throw new SparkException("BlockManagerMaster returned null") + } + return result.asInstanceOf[T] + } catch { + case ie: InterruptedException => throw ie + case e: Exception => + lastException = e + logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e) + } + Thread.sleep(AKKA_RETRY_INTERVAL_MS) + } + + throw new SparkException( + "Error sending message to BlockManagerMaster [message = " + message + "]", lastException) + } + +} diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index 328be158db68..4c2307383841 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -96,7 +96,7 @@ private[spark] object ThreadingTest { val actorSystem = ActorSystem("test") val conf = new SparkConf() val serializer = new KryoSerializer(conf) - val blockManagerMaster = new BlockManagerMaster( + val blockManagerMaster = new StandaloneBlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf) val blockManager = new BlockManager( diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 7c3d0208b195..c6244ace2fc3 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -121,7 +121,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { def getBlockIds(id: Long) = Seq[BroadcastBlockId](BroadcastBlockId(id)) // Verify that the broadcast file is created, and blocks are persisted only on the driver - def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { + def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { assert(blockIds.size === 1) val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === 1) @@ -138,7 +138,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { } // Verify that blocks are persisted in both the executors and the driver - def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { + def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { assert(blockIds.size === 1) val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === numSlaves + 1) @@ -151,7 +151,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver // is true. In the latter case, also verify that the broadcast file is deleted on the driver. - def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { + def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { assert(blockIds.size === 1) val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) val expectedNumBlocks = if (removeFromDriver) 0 else 1 @@ -193,7 +193,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { } // Verify that blocks are persisted only on the driver - def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { + def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { blockIds.foreach { blockId => val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === 1) @@ -207,7 +207,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { } // Verify that blocks are persisted in both the executors and the driver - def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { + def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { blockIds.foreach { blockId => val statuses = bmm.getBlockStatus(blockId, askSlaves = true) if (blockId.field == "meta") { @@ -229,7 +229,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver // is true. - def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { + def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { val expectedNumBlocks = if (removeFromDriver) 0 else 1 val possiblyNot = if (removeFromDriver) "" else " not" blockIds.foreach { blockId => @@ -257,9 +257,9 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { numSlaves: Int, // used only when distributed = true broadcastConf: SparkConf, getBlockIds: Long => Seq[BroadcastBlockId], - afterCreation: (Seq[BroadcastBlockId], BlockManagerMaster) => Unit, - afterUsingBroadcast: (Seq[BroadcastBlockId], BlockManagerMaster) => Unit, - afterUnpersist: (Seq[BroadcastBlockId], BlockManagerMaster) => Unit, + afterCreation: (Seq[BroadcastBlockId], StandaloneBlockManagerMaster) => Unit, + afterUsingBroadcast: (Seq[BroadcastBlockId], StandaloneBlockManagerMaster) => Unit, + afterUnpersist: (Seq[BroadcastBlockId], StandaloneBlockManagerMaster) => Unit, removeFromDriver: Boolean) { sc = if (distributed) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9f498d579a09..bdec44fc1e88 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, FunSuiteLike} import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} +import org.apache.spark.storage.{BlockId, BlockManagerId, StandaloneBlockManagerMaster} import org.apache.spark.util.CallSite class BuggyDAGEventProcessActor extends Actor { @@ -92,7 +92,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, conf) { + val blockManagerMaster = new StandaloneBlockManagerMaster(null, conf) { override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 23cb6905bfde..fd55220de5b9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -67,7 +67,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter this.actorSystem = actorSystem conf.set("spark.driver.port", boundPort.toString) - master = new BlockManagerMaster( + master = new StandaloneBlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf) From bfc327e4dd6b5ed327b917f0f139ba912180a4b6 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Sun, 20 Jul 2014 23:40:39 -0700 Subject: [PATCH 2/3] SPARK-2582. Remove unnecessary braces and also remove Logging inheritance in BlockManagerMaster --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 4 ++-- .../scala/org/apache/spark/storage/BlockManagerMaster.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index a82a0e53ebc4..d879d6c047d5 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -218,10 +218,10 @@ object SparkEnv extends Logging { val blockManagerMasterType = conf.get("spark.blockmanager.type", "standalone") var blockManagerMaster: BlockManagerMaster = null blockManagerMasterType match { - case _ => { // Since currently only one option exists, this is what is to be done in any case. + case _ => + // Since currently only one option exists, this is what is to be done in any case. blockManagerMaster = new StandaloneBlockManagerMaster(registerOrLookup( "BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) - } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 9957752eee7a..f87a12166e24 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -trait BlockManagerMaster extends Logging { +trait BlockManagerMaster { /** Remove a dead executor from the driver actor. This is only called on the driver side. */ def removeExecutor(execId: String) From eaaf500837dfbe67c42bdce8174b2910cedc7f07 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Mon, 21 Jul 2014 00:26:21 -0700 Subject: [PATCH 3/3] Fix test failures. --- .../spark/storage/BlockManagerMaster.scala | 2 +- .../StandaloneBlockManagerMaster.scala | 35 ++++++++++--------- .../spark/broadcast/BroadcastSuite.scala | 18 +++++----- 3 files changed, 28 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index f87a12166e24..d87e5fa86238 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -60,7 +60,7 @@ trait BlockManagerMaster { * Check if block manager master has a block. Note that this can be used to check for only * those blocks that are reported to block manager master. */ - def contains(blockId: BlockId) + def contains(blockId: BlockId): Boolean /** Get ids of other nodes in the cluster from the driver */ def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] diff --git a/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala index 65c9b054cc17..df95c6df139f 100644 --- a/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala @@ -38,7 +38,7 @@ with BlockManagerMaster { val timeout = AkkaUtils.askTimeout(conf) /** Remove a dead executor from the driver actor. This is only called on the driver side. */ - def removeExecutor(execId: String) { + override def removeExecutor(execId: String) { tell(RemoveExecutor(execId)) logInfo("Removed " + execId + " successfully in removeExecutor") } @@ -48,18 +48,19 @@ with BlockManagerMaster { * false if the driver does not know about the given block manager, which means the block * manager should re-register. */ - def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { + override def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { askDriverWithReply[Boolean](HeartBeat(blockManagerId)) } /** Register the BlockManager's id with the driver. */ - def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + override def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, + slaveActor: ActorRef) { logInfo("Trying to register BlockManager") tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) logInfo("Registered BlockManager") } - def updateBlockInfo( + override def updateBlockInfo( blockManagerId: BlockManagerId, blockId: BlockId, storageLevel: StorageLevel, @@ -73,12 +74,12 @@ with BlockManagerMaster { } /** Get locations of the blockId from the driver */ - def getLocations(blockId: BlockId): Seq[BlockManagerId] = { + override def getLocations(blockId: BlockId): Seq[BlockManagerId] = { askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId)) } /** Get locations of multiple blockIds from the driver */ - def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { + override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) } @@ -86,12 +87,12 @@ with BlockManagerMaster { * Check if block manager master has a block. Note that this can be used to check for only * those blocks that are reported to block manager master. */ - def contains(blockId: BlockId) = { + override def contains(blockId: BlockId) = { !getLocations(blockId).isEmpty } /** Get ids of other nodes in the cluster from the driver */ - def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { + override def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) if (result.length != numPeers) { throw new SparkException( @@ -104,12 +105,12 @@ with BlockManagerMaster { * Remove a block from the slaves that have it. This can only be used to remove * blocks that the driver knows about. */ - def removeBlock(blockId: BlockId) { + override def removeBlock(blockId: BlockId) { askDriverWithReply(RemoveBlock(blockId)) } /** Remove all blocks belonging to the given RDD. */ - def removeRdd(rddId: Int, blocking: Boolean) { + override def removeRdd(rddId: Int, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) future.onFailure { case e: Throwable => logError("Failed to remove RDD " + rddId, e) @@ -120,7 +121,7 @@ with BlockManagerMaster { } /** Remove all blocks belonging to the given shuffle. */ - def removeShuffle(shuffleId: Int, blocking: Boolean) { + override def removeShuffle(shuffleId: Int, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) future.onFailure { case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e) @@ -131,7 +132,7 @@ with BlockManagerMaster { } /** Remove all blocks belonging to the given broadcast. */ - def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { + override def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { val future = askDriverWithReply[Future[Seq[Int]]]( RemoveBroadcast(broadcastId, removeFromMaster)) future.onFailure { @@ -150,11 +151,11 @@ with BlockManagerMaster { * amount of memory allocated for the block manager, while the second is the * amount of remaining memory. */ - def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { + override def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) } - def getStorageStatus: Array[StorageStatus] = { + override def getStorageStatus: Array[StorageStatus] = { askDriverWithReply[Array[StorageStatus]](GetStorageStatus) } @@ -166,7 +167,7 @@ with BlockManagerMaster { * updated block statuses. This is useful when the master is not informed of the given block * by all block managers. */ - def getBlockStatus( + override def getBlockStatus( blockId: BlockId, askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { val msg = GetBlockStatus(blockId, askSlaves) @@ -195,7 +196,7 @@ with BlockManagerMaster { * updated block statuses. This is useful when the master is not informed of the given block * by all block managers. */ - def getMatchingBlockIds( + override def getMatchingBlockIds( filter: BlockId => Boolean, askSlaves: Boolean): Seq[BlockId] = { val msg = GetMatchingBlockIds(filter, askSlaves) @@ -204,7 +205,7 @@ with BlockManagerMaster { } /** Stop the driver actor, called only on the Spark driver node */ - def stop() { + override def stop() { if (driverActor != null) { tell(StopBlockManagerMaster) driverActor = null diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index c6244ace2fc3..7c3d0208b195 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -121,7 +121,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { def getBlockIds(id: Long) = Seq[BroadcastBlockId](BroadcastBlockId(id)) // Verify that the broadcast file is created, and blocks are persisted only on the driver - def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { + def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === 1) @@ -138,7 +138,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { } // Verify that blocks are persisted in both the executors and the driver - def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { + def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === numSlaves + 1) @@ -151,7 +151,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver // is true. In the latter case, also verify that the broadcast file is deleted on the driver. - def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { + def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { assert(blockIds.size === 1) val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) val expectedNumBlocks = if (removeFromDriver) 0 else 1 @@ -193,7 +193,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { } // Verify that blocks are persisted only on the driver - def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { + def afterCreation(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { blockIds.foreach { blockId => val statuses = bmm.getBlockStatus(blockIds.head, askSlaves = true) assert(statuses.size === 1) @@ -207,7 +207,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { } // Verify that blocks are persisted in both the executors and the driver - def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { + def afterUsingBroadcast(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { blockIds.foreach { blockId => val statuses = bmm.getBlockStatus(blockId, askSlaves = true) if (blockId.field == "meta") { @@ -229,7 +229,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { // Verify that blocks are unpersisted on all executors, and on all nodes if removeFromDriver // is true. - def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: StandaloneBlockManagerMaster) { + def afterUnpersist(blockIds: Seq[BroadcastBlockId], bmm: BlockManagerMaster) { val expectedNumBlocks = if (removeFromDriver) 0 else 1 val possiblyNot = if (removeFromDriver) "" else " not" blockIds.foreach { blockId => @@ -257,9 +257,9 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { numSlaves: Int, // used only when distributed = true broadcastConf: SparkConf, getBlockIds: Long => Seq[BroadcastBlockId], - afterCreation: (Seq[BroadcastBlockId], StandaloneBlockManagerMaster) => Unit, - afterUsingBroadcast: (Seq[BroadcastBlockId], StandaloneBlockManagerMaster) => Unit, - afterUnpersist: (Seq[BroadcastBlockId], StandaloneBlockManagerMaster) => Unit, + afterCreation: (Seq[BroadcastBlockId], BlockManagerMaster) => Unit, + afterUsingBroadcast: (Seq[BroadcastBlockId], BlockManagerMaster) => Unit, + afterUnpersist: (Seq[BroadcastBlockId], BlockManagerMaster) => Unit, removeFromDriver: Boolean) { sc = if (distributed) {