Skip to content

Commit de4ff73

Browse files
committed
[SPARK-3495] Block replication fails continuously when the replication target node is dead AND [SPARK-3496] Block replication by mistake chooses driver as target
If a block manager (say, A) wants to replicate a block and the node chosen for replication (say, B) is dead, then the attempt to send the block to B fails. However, this continues to fail indefinitely. Even if the driver learns about the demise of the B, A continues to try replicating to B and failing miserably. The reason behind this bug is that A initially fetches a list of peers from the driver (when B was active), but never updates it after B is dead. This affects Spark Streaming as its receiver uses block replication. The solution in this patch adds the following. - Changed BlockManagerMaster to return all the peers of a block manager, rather than the requested number. It also filters out driver BlockManager. - Refactored BlockManager's replication code to handle peer caching correctly. + The peer for replication is randomly selected. This is different from past behavior where for a node A, a node B was deterministically chosen for the lifetime of the application. + If replication fails to one node, the peers are refetched. + The peer cached has a TTL of 1 second to enable discovery of new peers and using them for replication. - Refactored use of \<driver\> in BlockManager into a new method `BlockManagerId.isDriver` - Added replication unit tests (replication was not tested till now, duh!) This should not make a difference in performance of Spark workloads where replication is not used. @andrewor14 @JoshRosen Author: Tathagata Das <[email protected]> Closes #2366 from tdas/replication-fix and squashes the following commits: 9690f57 [Tathagata Das] Moved replication tests to a new BlockManagerReplicationSuite. 0661773 [Tathagata Das] Minor changes based on PR comments. a55a65c [Tathagata Das] Added a unit test to test replication behavior. 012afa3 [Tathagata Das] Bug fix 89f91a0 [Tathagata Das] Minor change. 68e2c72 [Tathagata Das] Made replication peer selection logic more efficient. 08afaa9 [Tathagata Das] Made peer selection for replication deterministic to block id 3821ab9 [Tathagata Das] Fixes based on PR comments. 08e5646 [Tathagata Das] More minor changes. d402506 [Tathagata Das] Fixed imports. 4a20531 [Tathagata Das] Filtered driver block manager from peer list, and also consolidated the use of <driver> in BlockManager. 7598f91 [Tathagata Das] Minor changes. 03de02d [Tathagata Das] Change replication logic to correctly refetch peers from master on failure and on new worker addition. d081bf6 [Tathagata Das] Fixed bug in get peers and unit tests to test get-peers and replication under executor churn. 9f0ac9f [Tathagata Das] Modified replication tests to fail on replication bug. af0c1da [Tathagata Das] Added replication unit tests to BlockManagerSuite Conflicts: core/src/main/scala/org/apache/spark/storage/BlockManager.scala core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
1 parent 0a40eac commit de4ff73

File tree

8 files changed

+535
-44
lines changed

8 files changed

+535
-44
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 104 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ private[spark] class BlockManager(
139139
private val broadcastCleaner = new MetadataCleaner(
140140
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
141141

142+
// Field related to peer block managers that are necessary for block replication
143+
@volatile private var cachedPeers: Seq[BlockManagerId] = _
144+
private val peerFetchLock = new Object
145+
private var lastPeerFetchTime = 0L
146+
142147
initialize()
143148

144149
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -822,28 +827,111 @@ private[spark] class BlockManager(
822827
}
823828

824829
/**
825-
* Replicate block to another node.
830+
* Get peer block managers in the system.
831+
*/
832+
private def getPeers(forceFetch: Boolean): Seq[BlockManagerId] = {
833+
peerFetchLock.synchronized {
834+
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 60 * 1000) // milliseconds
835+
val timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
836+
if (cachedPeers == null || forceFetch || timeout) {
837+
cachedPeers = master.getPeers(blockManagerId).sortBy(_.hashCode)
838+
lastPeerFetchTime = System.currentTimeMillis
839+
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
840+
}
841+
cachedPeers
842+
}
843+
}
844+
845+
/**
846+
* Replicate block to another node. Not that this is a blocking call that returns after
847+
* the block has been replicated.
826848
*/
827-
@volatile var cachedPeers: Seq[BlockManagerId] = null
828849
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
850+
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
851+
val numPeersToReplicateTo = level.replication - 1
852+
val peersForReplication = new ArrayBuffer[BlockManagerId]
853+
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
854+
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
829855
val tLevel = StorageLevel(
830856
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
831-
if (cachedPeers == null) {
832-
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
857+
val startTime = System.currentTimeMillis
858+
val random = new Random(blockId.hashCode)
859+
860+
var replicationFailed = false
861+
var failures = 0
862+
var done = false
863+
864+
// Get cached list of peers
865+
peersForReplication ++= getPeers(forceFetch = false)
866+
867+
// Get a random peer. Note that this selection of a peer is deterministic on the block id.
868+
// So assuming the list of peers does not change and no replication failures,
869+
// if there are multiple attempts in the same node to replicate the same block,
870+
// the same set of peers will be selected.
871+
def getRandomPeer(): Option[BlockManagerId] = {
872+
// If replication had failed, then force update the cached list of peers and remove the peers
873+
// that have been already used
874+
if (replicationFailed) {
875+
peersForReplication.clear()
876+
peersForReplication ++= getPeers(forceFetch = true)
877+
peersForReplication --= peersReplicatedTo
878+
peersForReplication --= peersFailedToReplicateTo
879+
}
880+
if (!peersForReplication.isEmpty) {
881+
Some(peersForReplication(random.nextInt(peersForReplication.size)))
882+
} else {
883+
None
884+
}
833885
}
834-
for (peer: BlockManagerId <- cachedPeers) {
835-
val start = System.nanoTime
836-
data.rewind()
837-
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
838-
s"To node: $peer")
839-
val putBlock = PutBlock(blockId, data, tLevel)
840-
val cmId = new ConnectionManagerId(peer.host, peer.port)
841-
val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
842-
if (!syncPutBlockSuccess) {
843-
logError(s"Failed to call syncPutBlock to $peer")
886+
887+
// One by one choose a random peer and try uploading the block to it
888+
// If replication fails (e.g., target peer is down), force the list of cached peers
889+
// to be re-fetched from driver and then pick another random peer for replication. Also
890+
// temporarily black list the peer for which replication failed.
891+
//
892+
// This selection of a peer and replication is continued in a loop until one of the
893+
// following 3 conditions is fulfilled:
894+
// (i) specified number of peers have been replicated to
895+
// (ii) too many failures in replicating to peers
896+
// (iii) no peer left to replicate to
897+
//
898+
while (!done) {
899+
getRandomPeer() match {
900+
case Some(peer) =>
901+
val onePeerStartTime = System.currentTimeMillis
902+
data.rewind()
903+
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
904+
val putBlock = PutBlock(blockId, data, tLevel)
905+
val cmId = new ConnectionManagerId(peer.host, peer.port)
906+
val syncPutBlockSuccess = BlockManagerWorker.syncPutBlock(putBlock, cmId)
907+
if (syncPutBlockSuccess) {
908+
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %d ms"
909+
.format((System.currentTimeMillis - onePeerStartTime)))
910+
peersReplicatedTo += peer
911+
peersForReplication -= peer
912+
replicationFailed = false
913+
if (peersReplicatedTo.size == numPeersToReplicateTo) {
914+
done = true // specified number of peers have been replicated to
915+
}
916+
} else {
917+
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures")
918+
failures += 1
919+
replicationFailed = true
920+
peersFailedToReplicateTo += peer
921+
if (failures > maxReplicationFailures) { // too many failures in replicating to peers
922+
done = true
923+
}
924+
}
925+
case None => // no peer left to replicate to
926+
done = true
844927
}
845-
logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
846-
.format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
928+
}
929+
val timeTakeMs = (System.currentTimeMillis - startTime)
930+
logTrace(s"Replicating $blockId of ${data.limit()} bytes to " +
931+
s"${peersReplicatedTo.size} peer(s) took $timeTakeMs ms")
932+
if (peersReplicatedTo.size < numPeersToReplicateTo) {
933+
logWarning(s"Block $blockId replicated to only " +
934+
s"${peersReplicatedTo.size} peer(s) instead of $numPeersToReplicateTo peers")
847935
}
848936
}
849937

core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,9 @@ class BlockManagerId private (
6262

6363
def nettyPort: Int = nettyPort_
6464

65-
override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
65+
def isDriver: Boolean = (executorId == "<driver>")
66+
67+
override def writeExternal(out: ObjectOutput) {
6668
out.writeUTF(executorId_)
6769
out.writeUTF(host_)
6870
out.writeInt(port_)

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
8080
}
8181

8282
/** Get ids of other nodes in the cluster from the driver */
83-
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
84-
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
85-
if (result.length != numPeers) {
86-
throw new SparkException(
87-
"Error getting peers, only got " + result.size + " instead of " + numPeers)
88-
}
89-
result
83+
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
84+
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
9085
}
9186

9287
/**

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
8383
case GetLocationsMultipleBlockIds(blockIds) =>
8484
sender ! getLocationsMultipleBlockIds(blockIds)
8585

86-
case GetPeers(blockManagerId, size) =>
87-
sender ! getPeers(blockManagerId, size)
86+
case GetPeers(blockManagerId) =>
87+
sender ! getPeers(blockManagerId)
8888

8989
case GetMemoryStatus =>
9090
sender ! memoryStatus
@@ -173,11 +173,10 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
173173
* from the executors, but not from the driver.
174174
*/
175175
private def removeBroadcast(broadcastId: Long, removeFromDriver: Boolean): Future[Seq[Int]] = {
176-
// TODO: Consolidate usages of <driver>
177176
import context.dispatcher
178177
val removeMsg = RemoveBroadcast(broadcastId, removeFromDriver)
179178
val requiredBlockManagers = blockManagerInfo.values.filter { info =>
180-
removeFromDriver || info.blockManagerId.executorId != "<driver>"
179+
removeFromDriver || !info.blockManagerId.isDriver
181180
}
182181
Future.sequence(
183182
requiredBlockManagers.map { bm =>
@@ -213,7 +212,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
213212
val minSeenTime = now - slaveTimeout
214213
val toRemove = new mutable.HashSet[BlockManagerId]
215214
for (info <- blockManagerInfo.values) {
216-
if (info.lastSeenMs < minSeenTime && info.blockManagerId.executorId != "<driver>") {
215+
if (info.lastSeenMs < minSeenTime && !info.blockManagerId.isDriver) {
217216
logWarning("Removing BlockManager " + info.blockManagerId + " with no recent heart beats: "
218217
+ (now - info.lastSeenMs) + "ms exceeds " + slaveTimeout + "ms")
219218
toRemove += info.blockManagerId
@@ -233,7 +232,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
233232
*/
234233
private def heartbeatReceived(blockManagerId: BlockManagerId): Boolean = {
235234
if (!blockManagerInfo.contains(blockManagerId)) {
236-
blockManagerId.executorId == "<driver>" && !isLocal
235+
blockManagerId.isDriver && !isLocal
237236
} else {
238237
blockManagerInfo(blockManagerId).updateLastSeenMs()
239238
true
@@ -355,7 +354,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
355354
tachyonSize: Long) {
356355

357356
if (!blockManagerInfo.contains(blockManagerId)) {
358-
if (blockManagerId.executorId == "<driver>" && !isLocal) {
357+
if (blockManagerId.isDriver && !isLocal) {
359358
// We intentionally do not register the master (except in local mode),
360359
// so we should not indicate failure.
361360
sender ! true
@@ -403,16 +402,14 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
403402
blockIds.map(blockId => getLocations(blockId))
404403
}
405404

406-
private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
407-
val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
408-
409-
val selfIndex = peers.indexOf(blockManagerId)
410-
if (selfIndex == -1) {
411-
throw new SparkException("Self index for " + blockManagerId + " not found")
405+
/** Get the list of the peers of the given block manager */
406+
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
407+
val blockManagerIds = blockManagerInfo.keySet
408+
if (blockManagerIds.contains(blockManagerId)) {
409+
blockManagerIds.filterNot { _.isDriver }.filterNot { _ == blockManagerId }.toSeq
410+
} else {
411+
Seq.empty
412412
}
413-
414-
// Note that this logic will select the same node multiple times if there aren't enough peers
415-
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
416413
}
417414
}
418415

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ private[spark] object BlockManagerMessages {
9090

9191
case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
9292

93-
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
93+
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
9494

9595
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
9696

core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext {
132132
val statuses = bmm.getBlockStatus(blockId, askSlaves = true)
133133
assert(statuses.size === 1)
134134
statuses.head match { case (bm, status) =>
135-
assert(bm.executorId === "<driver>", "Block should only be on the driver")
135+
assert(bm.isDriver, "Block should only be on the driver")
136136
assert(status.storageLevel === StorageLevel.MEMORY_AND_DISK)
137137
assert(status.memSize > 0, "Block should be in memory store on the driver")
138138
assert(status.diskSize === 0, "Block should not be in disk store on the driver")

0 commit comments

Comments
 (0)