Skip to content

Commit 59297e9

Browse files
tsliwowiczAndrew Or
authored andcommitted
[SPARK-4006] In long running contexts, we encountered the situation of d...
...ouble registe... ...r without a remove in between. The cause for that is unknown, and assumed a temp network issue. However, since the second register is with a BlockManagerId on a different port, blockManagerInfo.contains() returns false, while blockManagerIdByExecutor returns Some. This inconsistency is caught in a conditional statement that does System.exit(1), which is a huge robustness issue for us. The fix - simply remove the old id from both maps during register when this happens. We are mimicking the behavior of expireDeadHosts(), by doing local cleanup of the maps before trying to add new ones. Also - added some logging for register and unregister. This is just like #2886 except it's on branch-1.1 Author: Tal Sliwowicz <[email protected]> Closes #2915 from tsliwowicz/branch-1.1-block-mgr-removal and squashes the following commits: d122236 [Tal Sliwowicz] [SPARK-4006] In long running contexts, we encountered the situation of double registe...
1 parent 80dde80 commit 59297e9

File tree

1 file changed

+13
-12
lines changed

1 file changed

+13
-12
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
204204
}
205205
}
206206
listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
207+
logInfo(s"Removing block manager $blockManagerId")
207208
}
208209

209210
private def expireDeadHosts() {
@@ -327,20 +328,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
327328
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
328329
if (!blockManagerInfo.contains(id)) {
329330
blockManagerIdByExecutor.get(id.executorId) match {
330-
case Some(manager) =>
331-
// A block manager of the same executor already exists.
332-
// This should never happen. Let's just quit.
333-
logError("Got two different block manager registrations on " + id.executorId)
334-
System.exit(1)
331+
case Some(oldId) =>
332+
// A block manager of the same executor already exists, so remove it (assumed dead)
333+
logError("Got two different block manager registrations on same executor - "
334+
+ s" will replace old one $oldId with new one $id")
335+
removeExecutor(id.executorId)
335336
case None =>
336-
blockManagerIdByExecutor(id.executorId) = id
337337
}
338-
339-
logInfo("Registering block manager %s with %s RAM".format(
340-
id.hostPort, Utils.bytesToString(maxMemSize)))
341-
342-
blockManagerInfo(id) =
343-
new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
338+
logInfo("Registering block manager %s with %s RAM, %s".format(
339+
id.hostPort, Utils.bytesToString(maxMemSize), id))
340+
341+
blockManagerIdByExecutor(id.executorId) = id
342+
343+
blockManagerInfo(id) = new BlockManagerInfo(
344+
id, System.currentTimeMillis(), maxMemSize, slaveActor)
344345
}
345346
listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
346347
}

0 commit comments

Comments
 (0)