From efd93f2026ddc427e84fa03e8a595ded2b1a81ce Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Sun, 12 Oct 2014 11:35:20 +0300 Subject: [PATCH 1/3] In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. --- .../storage/BlockManagerMasterActor.scala | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 2c1a4e2f5d3a..5cb9a50a3d25 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -145,12 +145,17 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) - + // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId + + logInfo("removed executorId %s from blockManagerIdByExecutor".format(blockManagerId.executorId)) // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) + + logInfo("removed blockManagerId %s from blockManagerInfo".format(blockManagerId)) + val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -160,6 +165,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act blockLocations.remove(locations) } } + + logInfo("done with remove "+blockManagerId) } private def expireDeadHosts() { @@ -180,6 +187,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def removeExecutor(execId: String) { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) + logInfo("removed executor " + execId + " from BlockManagerMaster.") } private def heartBeat(blockManagerId: BlockManagerId): Boolean = { @@ -223,18 +231,25 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act } private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { + logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id)) + if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => // A block manager of the same executor already exists. // This should never happen. Let's just quit. - logError("Got two different block manager registrations on " + id.executorId) - System.exit(1) + logError("Got two different block manager registrations on same executor - will remove, new Id " + id+", orig id - "+manager) + removeExecutor(id.executorId) case None => - blockManagerIdByExecutor(id.executorId) = id + logInfo("about to register new id "+id) } - blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( - id, System.currentTimeMillis(), maxMemSize, slaveActor) + + blockManagerIdByExecutor(id.executorId) = id + logInfo("Added %s to blockManagerIdByExecutor".format(id.executorId)) + + val info = new BlockManagerMasterActor.BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) + blockManagerInfo(id) = info + logInfo("Added %s, %s to blockManagerInfo".format(id, info)) } } From 81d69f088e421b19e47495d06e8b187a0ec29075 Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Sun, 12 Oct 2014 11:41:53 +0300 Subject: [PATCH 2/3] fixed comment --- .../org/apache/spark/storage/BlockManagerMasterActor.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 5cb9a50a3d25..50f22f64dd3d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -236,8 +236,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { case Some(manager) => - // A block manager of the same executor already exists. - // This should never happen. Let's just quit. + // A block manager of the same executor already exists so remove it (assumed dead). logError("Got two different block manager registrations on same executor - will remove, new Id " + id+", orig id - "+manager) removeExecutor(id.executorId) case None => From 95ae4dbe090f16e863614a2f4f2e0b0fb90ec1af Mon Sep 17 00:00:00 2001 From: Tal Sliwowicz Date: Thu, 23 Oct 2014 16:37:51 -0400 Subject: [PATCH 3/3] [SPARK-4006] In long running contexts, we encountered the situation of double registe... ...r without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. This is just like https://github.com/apache/spark/pull/2854 except it's on master Author: Tal Sliwowicz Closes #2886 from tsliwowicz/master-block-mgr-removal and squashes the following commits: 094d508 [Tal Sliwowicz] some more white space change undone 41a2217 [Tal Sliwowicz] some more whitspaces change undone 7bcfc3d [Tal Sliwowicz] whitspaces fix df9d98f [Tal Sliwowicz] Code review comments fixed f48bce9 [Tal Sliwowicz] In long running contexts, we encountered the situation of double register without a remove in between. The cause for that is unknown, and assumed a temp network issue. (cherry picked from commit 6b485225271a3c616c4fa1231c20090a95c86f32) Conflicts: core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala --- .../storage/BlockManagerMasterActor.scala | 31 +++++++------------ 1 file changed, 11 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 50f22f64dd3d..8a82dc152e2b 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -145,17 +145,12 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def removeBlockManager(blockManagerId: BlockManagerId) { val info = blockManagerInfo(blockManagerId) - + // Remove the block manager from blockManagerIdByExecutor. blockManagerIdByExecutor -= blockManagerId.executorId - - logInfo("removed executorId %s from blockManagerIdByExecutor".format(blockManagerId.executorId)) // Remove it from blockManagerInfo and remove all the blocks. blockManagerInfo.remove(blockManagerId) - - logInfo("removed blockManagerId %s from blockManagerInfo".format(blockManagerId)) - val iterator = info.blocks.keySet.iterator while (iterator.hasNext) { val blockId = iterator.next @@ -165,8 +160,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act blockLocations.remove(locations) } } - - logInfo("done with remove "+blockManagerId) + logInfo(s"Removing block manager $blockManagerId") } private def expireDeadHosts() { @@ -187,7 +181,6 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act private def removeExecutor(execId: String) { logInfo("Trying to remove executor " + execId + " from BlockManagerMaster.") blockManagerIdByExecutor.get(execId).foreach(removeBlockManager) - logInfo("removed executor " + execId + " from BlockManagerMaster.") } private def heartBeat(blockManagerId: BlockManagerId): Boolean = { @@ -231,24 +224,22 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf) extends Act } private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) { - logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id)) - if (!blockManagerInfo.contains(id)) { blockManagerIdByExecutor.get(id.executorId) match { - case Some(manager) => - // A block manager of the same executor already exists so remove it (assumed dead). - logError("Got two different block manager registrations on same executor - will remove, new Id " + id+", orig id - "+manager) - removeExecutor(id.executorId) + case Some(oldId) => + // A block manager of the same executor already exists, so remove it (assumed dead) + logError("Got two different block manager registrations on same executor - " + + s" will replace old one $oldId with new one $id") + removeExecutor(id.executorId) case None => - logInfo("about to register new id "+id) } + logInfo("Registering block manager %s with %s RAM, %s".format( + id.hostPort, Utils.bytesToString(maxMemSize), id)) blockManagerIdByExecutor(id.executorId) = id - logInfo("Added %s to blockManagerIdByExecutor".format(id.executorId)) - val info = new BlockManagerMasterActor.BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor) - blockManagerInfo(id) = info - logInfo("Added %s, %s to blockManagerInfo".format(id, info)) + blockManagerInfo(id) = new BlockManagerMasterActor.BlockManagerInfo( + id, System.currentTimeMillis(), maxMemSize, slaveActor) } }