diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 6ee731b22c03..738aa618f75f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -215,9 +215,15 @@ object SparkEnv extends Logging { "MapOutputTracker", new MapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) - val blockManagerMaster = new BlockManagerMaster(registerOrLookup( - "BlockManagerMaster", - new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + val blockManagerMasterType = conf.get("spark.blockmanager.type", "standalone") + var blockManagerMaster: BlockManagerMaster = null + blockManagerMasterType match { + case _ => + // Since currently only one option exists, this is what is to be done in any case. + blockManagerMaster = new StandaloneBlockManagerMaster(registerOrLookup( + "BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf) + } + val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf, securityManager, mapOutputTracker) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index 7897fade2df2..d87e5fa86238 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.spark.storage import scala.concurrent.{Await, Future} @@ -28,35 +27,20 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging { - val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) - val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) - - val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" - - val timeout = AkkaUtils.askTimeout(conf) +trait BlockManagerMaster { /** Remove a dead executor from the driver actor. This is only called on the driver side. */ - def removeExecutor(execId: String) { - tell(RemoveExecutor(execId)) - logInfo("Removed " + execId + " successfully in removeExecutor") - } + def removeExecutor(execId: String) /** * Send the driver actor a heart beat from the slave. Returns true if everything works out, * false if the driver does not know about the given block manager, which means the block * manager should re-register. */ - def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { - askDriverWithReply[Boolean](HeartBeat(blockManagerId)) - } + def sendHeartBeat(blockManagerId: BlockManagerId): Boolean /** Register the BlockManager's id with the driver. */ - def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { - logInfo("Trying to register BlockManager") - tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) - logInfo("Registered BlockManager") - } + def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) def updateBlockInfo( blockManagerId: BlockManagerId, @@ -64,84 +48,37 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log storageLevel: StorageLevel, memSize: Long, diskSize: Long, - tachyonSize: Long): Boolean = { - val res = askDriverWithReply[Boolean]( - UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) - logInfo("Updated info of block " + blockId) - res - } + tachyonSize: Long): Boolean /** Get locations of the blockId from the driver */ - def getLocations(blockId: BlockId): Seq[BlockManagerId] = { - askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId)) - } + def getLocations(blockId: BlockId): Seq[BlockManagerId] /** Get locations of multiple blockIds from the driver */ - def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { - askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) - } + def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] /** * Check if block manager master has a block. Note that this can be used to check for only * those blocks that are reported to block manager master. */ - def contains(blockId: BlockId) = { - !getLocations(blockId).isEmpty - } + def contains(blockId: BlockId): Boolean /** Get ids of other nodes in the cluster from the driver */ - def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { - val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) - if (result.length != numPeers) { - throw new SparkException( - "Error getting peers, only got " + result.size + " instead of " + numPeers) - } - result - } + def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] /** * Remove a block from the slaves that have it. This can only be used to remove * blocks that the driver knows about. */ - def removeBlock(blockId: BlockId) { - askDriverWithReply(RemoveBlock(blockId)) - } + def removeBlock(blockId: BlockId) /** Remove all blocks belonging to the given RDD. */ - def removeRdd(rddId: Int, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) - future.onFailure { - case e: Throwable => logError("Failed to remove RDD " + rddId, e) - } - if (blocking) { - Await.result(future, timeout) - } - } + def removeRdd(rddId: Int, blocking: Boolean) /** Remove all blocks belonging to the given shuffle. */ - def removeShuffle(shuffleId: Int, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) - future.onFailure { - case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e) - } - if (blocking) { - Await.result(future, timeout) - } - } + def removeShuffle(shuffleId: Int, blocking: Boolean) /** Remove all blocks belonging to the given broadcast. */ - def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { - val future = askDriverWithReply[Future[Seq[Int]]]( - RemoveBroadcast(broadcastId, removeFromMaster)) - future.onFailure { - case e: Throwable => - logError("Failed to remove broadcast " + broadcastId + - " with removeFromMaster = " + removeFromMaster, e) - } - if (blocking) { - Await.result(future, timeout) - } - } + def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) /** * Return the memory status for each block manager, in the form of a map from @@ -149,13 +86,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log * amount of memory allocated for the block manager, while the second is the * amount of remaining memory. */ - def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { - askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) - } + def getMemoryStatus: Map[BlockManagerId, (Long, Long)] - def getStorageStatus: Array[StorageStatus] = { - askDriverWithReply[Array[StorageStatus]](GetStorageStatus) - } + def getStorageStatus: Array[StorageStatus] /** * Return the block's status on all block managers, if any. NOTE: This is a @@ -166,25 +99,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log * by all block managers. */ def getBlockStatus( - blockId: BlockId, - askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { - val msg = GetBlockStatus(blockId, askSlaves) - /* - * To avoid potential deadlocks, the use of Futures is necessary, because the master actor - * should not block on waiting for a block manager, which can in turn be waiting for the - * master actor for a response to a prior message. - */ - val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) - val (blockManagerIds, futures) = response.unzip - val result = Await.result(Future.sequence(futures), timeout) - if (result == null) { - throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) - } - val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]] - blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => - status.map { s => (blockManagerId, s) } - }.toMap - } + blockId: BlockId, + askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] /** * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This @@ -196,60 +112,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log */ def getMatchingBlockIds( filter: BlockId => Boolean, - askSlaves: Boolean): Seq[BlockId] = { - val msg = GetMatchingBlockIds(filter, askSlaves) - val future = askDriverWithReply[Future[Seq[BlockId]]](msg) - Await.result(future, timeout) - } + askSlaves: Boolean): Seq[BlockId] /** Stop the driver actor, called only on the Spark driver node */ - def stop() { - if (driverActor != null) { - tell(StopBlockManagerMaster) - driverActor = null - logInfo("BlockManagerMaster stopped") - } - } - - /** Send a one-way message to the master actor, to which we expect it to reply with true. */ - private def tell(message: Any) { - if (!askDriverWithReply[Boolean](message)) { - throw new SparkException("BlockManagerMasterActor returned false, expected true.") - } - } - - /** - * Send a message to the driver actor and get its result within a default timeout, or - * throw a SparkException if this fails. - */ - private def askDriverWithReply[T](message: Any): T = { - // TODO: Consider removing multiple attempts - if (driverActor == null) { - throw new SparkException("Error sending message to BlockManager as driverActor is null " + - "[message = " + message + "]") - } - var attempts = 0 - var lastException: Exception = null - while (attempts < AKKA_RETRY_ATTEMPTS) { - attempts += 1 - try { - val future = driverActor.ask(message)(timeout) - val result = Await.result(future, timeout) - if (result == null) { - throw new SparkException("BlockManagerMaster returned null") - } - return result.asInstanceOf[T] - } catch { - case ie: InterruptedException => throw ie - case e: Exception => - lastException = e - logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e) - } - Thread.sleep(AKKA_RETRY_INTERVAL_MS) - } - - throw new SparkException( - "Error sending message to BlockManagerMaster [message = " + message + "]", lastException) - } + def stop() } diff --git a/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala new file mode 100644 index 000000000000..df95c6df139f --- /dev/null +++ b/core/src/main/scala/org/apache/spark/storage/StandaloneBlockManagerMaster.scala @@ -0,0 +1,257 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.storage + +import scala.concurrent.{Await, Future} +import scala.concurrent.ExecutionContext.Implicits.global + +import akka.actor._ +import akka.pattern.ask + +import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.storage.BlockManagerMessages._ +import org.apache.spark.util.AkkaUtils + +private[spark] +class StandaloneBlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging +with BlockManagerMaster { + val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) + val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) + + val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" + + val timeout = AkkaUtils.askTimeout(conf) + + /** Remove a dead executor from the driver actor. This is only called on the driver side. */ + override def removeExecutor(execId: String) { + tell(RemoveExecutor(execId)) + logInfo("Removed " + execId + " successfully in removeExecutor") + } + + /** + * Send the driver actor a heart beat from the slave. Returns true if everything works out, + * false if the driver does not know about the given block manager, which means the block + * manager should re-register. + */ + override def sendHeartBeat(blockManagerId: BlockManagerId): Boolean = { + askDriverWithReply[Boolean](HeartBeat(blockManagerId)) + } + + /** Register the BlockManager's id with the driver. */ + override def registerBlockManager(blockManagerId: BlockManagerId, maxMemSize: Long, + slaveActor: ActorRef) { + logInfo("Trying to register BlockManager") + tell(RegisterBlockManager(blockManagerId, maxMemSize, slaveActor)) + logInfo("Registered BlockManager") + } + + override def updateBlockInfo( + blockManagerId: BlockManagerId, + blockId: BlockId, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long, + tachyonSize: Long): Boolean = { + val res = askDriverWithReply[Boolean]( + UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize, tachyonSize)) + logInfo("Updated info of block " + blockId) + res + } + + /** Get locations of the blockId from the driver */ + override def getLocations(blockId: BlockId): Seq[BlockManagerId] = { + askDriverWithReply[Seq[BlockManagerId]](GetLocations(blockId)) + } + + /** Get locations of multiple blockIds from the driver */ + override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { + askDriverWithReply[Seq[Seq[BlockManagerId]]](GetLocationsMultipleBlockIds(blockIds)) + } + + /** + * Check if block manager master has a block. Note that this can be used to check for only + * those blocks that are reported to block manager master. + */ + override def contains(blockId: BlockId) = { + !getLocations(blockId).isEmpty + } + + /** Get ids of other nodes in the cluster from the driver */ + override def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = { + val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers)) + if (result.length != numPeers) { + throw new SparkException( + "Error getting peers, only got " + result.size + " instead of " + numPeers) + } + result + } + + /** + * Remove a block from the slaves that have it. This can only be used to remove + * blocks that the driver knows about. + */ + override def removeBlock(blockId: BlockId) { + askDriverWithReply(RemoveBlock(blockId)) + } + + /** Remove all blocks belonging to the given RDD. */ + override def removeRdd(rddId: Int, blocking: Boolean) { + val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId)) + future.onFailure { + case e: Throwable => logError("Failed to remove RDD " + rddId, e) + } + if (blocking) { + Await.result(future, timeout) + } + } + + /** Remove all blocks belonging to the given shuffle. */ + override def removeShuffle(shuffleId: Int, blocking: Boolean) { + val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) + future.onFailure { + case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e) + } + if (blocking) { + Await.result(future, timeout) + } + } + + /** Remove all blocks belonging to the given broadcast. */ + override def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { + val future = askDriverWithReply[Future[Seq[Int]]]( + RemoveBroadcast(broadcastId, removeFromMaster)) + future.onFailure { + case e: Throwable => + logError("Failed to remove broadcast " + broadcastId + + " with removeFromMaster = " + removeFromMaster, e) + } + if (blocking) { + Await.result(future, timeout) + } + } + + /** + * Return the memory status for each block manager, in the form of a map from + * the block manager's id to two long values. The first value is the maximum + * amount of memory allocated for the block manager, while the second is the + * amount of remaining memory. + */ + override def getMemoryStatus: Map[BlockManagerId, (Long, Long)] = { + askDriverWithReply[Map[BlockManagerId, (Long, Long)]](GetMemoryStatus) + } + + override def getStorageStatus: Array[StorageStatus] = { + askDriverWithReply[Array[StorageStatus]](GetStorageStatus) + } + + /** + * Return the block's status on all block managers, if any. NOTE: This is a + * potentially expensive operation and should only be used for testing. + * + * If askSlaves is true, this invokes the master to query each block manager for the most + * updated block statuses. This is useful when the master is not informed of the given block + * by all block managers. + */ + override def getBlockStatus( + blockId: BlockId, + askSlaves: Boolean = true): Map[BlockManagerId, BlockStatus] = { + val msg = GetBlockStatus(blockId, askSlaves) + /* + * To avoid potential deadlocks, the use of Futures is necessary, because the master actor + * should not block on waiting for a block manager, which can in turn be waiting for the + * master actor for a response to a prior message. + */ + val response = askDriverWithReply[Map[BlockManagerId, Future[Option[BlockStatus]]]](msg) + val (blockManagerIds, futures) = response.unzip + val result = Await.result(Future.sequence(futures), timeout) + if (result == null) { + throw new SparkException("BlockManager returned null for BlockStatus query: " + blockId) + } + val blockStatus = result.asInstanceOf[Iterable[Option[BlockStatus]]] + blockManagerIds.zip(blockStatus).flatMap { case (blockManagerId, status) => + status.map { s => (blockManagerId, s) } + }.toMap + } + + /** + * Return a list of ids of existing blocks such that the ids match the given filter. NOTE: This + * is a potentially expensive operation and should only be used for testing. + * + * If askSlaves is true, this invokes the master to query each block manager for the most + * updated block statuses. This is useful when the master is not informed of the given block + * by all block managers. + */ + override def getMatchingBlockIds( + filter: BlockId => Boolean, + askSlaves: Boolean): Seq[BlockId] = { + val msg = GetMatchingBlockIds(filter, askSlaves) + val future = askDriverWithReply[Future[Seq[BlockId]]](msg) + Await.result(future, timeout) + } + + /** Stop the driver actor, called only on the Spark driver node */ + override def stop() { + if (driverActor != null) { + tell(StopBlockManagerMaster) + driverActor = null + logInfo("BlockManagerMaster stopped") + } + } + + /** Send a one-way message to the master actor, to which we expect it to reply with true. */ + private def tell(message: Any) { + if (!askDriverWithReply[Boolean](message)) { + throw new SparkException("BlockManagerMasterActor returned false, expected true.") + } + } + + /** + * Send a message to the driver actor and get its result within a default timeout, or + * throw a SparkException if this fails. + */ + private def askDriverWithReply[T](message: Any): T = { + // TODO: Consider removing multiple attempts + if (driverActor == null) { + throw new SparkException("Error sending message to BlockManager as driverActor is null " + + "[message = " + message + "]") + } + var attempts = 0 + var lastException: Exception = null + while (attempts < AKKA_RETRY_ATTEMPTS) { + attempts += 1 + try { + val future = driverActor.ask(message)(timeout) + val result = Await.result(future, timeout) + if (result == null) { + throw new SparkException("BlockManagerMaster returned null") + } + return result.asInstanceOf[T] + } catch { + case ie: InterruptedException => throw ie + case e: Exception => + lastException = e + logWarning("Error sending message to BlockManagerMaster in " + attempts + " attempts", e) + } + Thread.sleep(AKKA_RETRY_INTERVAL_MS) + } + + throw new SparkException( + "Error sending message to BlockManagerMaster [message = " + message + "]", lastException) + } + +} diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index 75c2e09a6bbb..0b1c5d3fceeb 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -96,7 +96,7 @@ private[spark] object ThreadingTest { val actorSystem = ActorSystem("test") val conf = new SparkConf() val serializer = new KryoSerializer(conf) - val blockManagerMaster = new BlockManagerMaster( + val blockManagerMaster = new StandaloneBlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf) val blockManager = new BlockManager( diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 9021662bcf71..cccc39f5c55f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, FunSuiteLike} import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} +import org.apache.spark.storage.{BlockId, BlockManagerId, StandaloneBlockManagerMaster} import org.apache.spark.util.CallSite class BuggyDAGEventProcessActor extends Actor { @@ -115,7 +115,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F */ val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]] // stub out BlockManagerMaster.getLocations to use our cacheLocations - val blockManagerMaster = new BlockManagerMaster(null, conf) { + val blockManagerMaster = new StandaloneBlockManagerMaster(null, conf) { override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = { blockIds.map { _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)). diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index dd4fd535d357..1016e3c553d2 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -81,7 +81,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter conf.set("spark.storage.unrollFraction", "0.4") conf.set("spark.storage.unrollMemoryThreshold", "512") - master = new BlockManagerMaster( + master = new StandaloneBlockManagerMaster( actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), conf)