Skip to content

Commit 03de02d

Browse files
committed
Change replication logic to correctly refetch peers from master on failure and on new worker addition.
1 parent d081bf6 commit 03de02d

File tree

3 files changed

+101
-70
lines changed

3 files changed

+101
-70
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 69 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,9 @@ private[spark] class BlockManager(
112112
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
113113
private val broadcastCleaner = new MetadataCleaner(
114114
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
115-
private val cachedPeers = new ArrayBuffer[BlockManagerId]
115+
private val cachedPeers = new mutable.HashSet[BlockManagerId]
116116
private var lastPeerFetchTime = 0L
117+
117118
initialize()
118119

119120
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -791,52 +792,85 @@ private[spark] class BlockManager(
791792
/**
792793
* Get peer block managers in the system.
793794
*/
794-
private def getPeers(numPeers: Int): Seq[BlockManagerId] = cachedPeers.synchronized {
795-
val currentTime = System.currentTimeMillis
796-
// If cache is empty or has insufficient number of peers, fetch from master
797-
if (cachedPeers.isEmpty || numPeers > cachedPeers.size) {
798-
cachedPeers.clear()
799-
cachedPeers ++= master.getPeers(blockManagerId)
800-
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
801-
lastPeerFetchTime = System.currentTimeMillis
802-
}
803-
if (numPeers > cachedPeers.size) {
804-
// if enough peers cannot be provided, return all of them
805-
logDebug(s"Not enough peers - cached peers = ${cachedPeers.size}, required peers = $numPeers")
806-
cachedPeers
807-
} else {
808-
cachedPeers.take(numPeers)
795+
private def getPeers(forceFetch: Boolean): mutable.Set[BlockManagerId] = {
796+
val cachedPeersTtl = conf.getInt("spark.storage.cachedPeersTtl", 1000) // milliseconds
797+
def timeout = System.currentTimeMillis - lastPeerFetchTime > cachedPeersTtl
798+
799+
cachedPeers.synchronized {
800+
if (cachedPeers.isEmpty || forceFetch || timeout) {
801+
cachedPeers.clear()
802+
cachedPeers ++= master.getPeers(blockManagerId)
803+
lastPeerFetchTime = System.currentTimeMillis
804+
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
805+
}
809806
}
807+
cachedPeers
810808
}
811809

812810
/**
813811
* Replicate block to another node.
814812
*/
815813
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
814+
val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1)
815+
val numPeersToReplicateTo = level.replication - 1
816+
val peersReplicatedTo = new mutable.HashSet[BlockManagerId]
817+
val peersFailedToReplicateTo = new mutable.HashSet[BlockManagerId]
816818
val tLevel = StorageLevel(
817819
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
818-
val selectedPeers = getPeers(level.replication - 1)
819-
if (selectedPeers.size < level.replication - 1) {
820-
logWarning(s"Failed to replicate block to ${level.replication - 1} peer(s) " +
821-
s"as only ${selectedPeers.size} peer(s) were found")
820+
val startTime = System.nanoTime
821+
822+
var forceFetchPeers = false
823+
var failures = 0
824+
var done = false
825+
826+
// Get a random peer
827+
def getRandomPeer(): Option[BlockManagerId] = {
828+
val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo
829+
if (!peers.isEmpty) Some(peers.toSeq(Random.nextInt(peers.size))) else None
822830
}
823-
selectedPeers.foreach { peer =>
824-
val start = System.nanoTime
825-
data.rewind()
826-
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} bytes. " +
827-
s"To node: $peer")
828831

829-
try {
830-
blockTransferService.uploadBlockSync(
831-
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
832-
} catch {
833-
case e: Exception =>
834-
logError(s"Failed to replicate block to $peer", e)
835-
cachedPeers.synchronized { cachedPeers.clear() }
832+
// One by one choose a random peer and try uploading the block to it
833+
// If replication fails (e.g., target peer is down), force the list of cached peers
834+
// to be re-fetched from driver and then pick another random peer for replication. Also
835+
// temporarily black list the peer for which replication failed.
836+
while (!done) {
837+
getRandomPeer() match {
838+
case Some(peer) =>
839+
try {
840+
val onePeerStartTime = System.nanoTime
841+
data.rewind()
842+
logTrace(s"Trying to replicate $blockId of ${data.limit()} bytes to $peer")
843+
blockTransferService.uploadBlockSync(
844+
peer.host, peer.port, blockId.toString, new NioByteBufferManagedBuffer(data), tLevel)
845+
logTrace(s"Replicated $blockId of ${data.limit()} bytes to $peer in %f ms"
846+
.format((System.nanoTime - onePeerStartTime) / 1e6))
847+
peersReplicatedTo += peer
848+
forceFetchPeers = false
849+
if (peersReplicatedTo.size == numPeersToReplicateTo) {
850+
done = true
851+
}
852+
} catch {
853+
case e: Exception =>
854+
logWarning(s"Failed to replicate $blockId to $peer, failure #$failures", e)
855+
failures += 1
856+
forceFetchPeers = true
857+
peersFailedToReplicateTo += peer
858+
if (failures > maxReplicationFailures) {
859+
done = true
860+
}
861+
}
862+
case None =>
863+
// no peer left to replicate to
864+
done = true
836865
}
837-
838-
logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."
839-
.format(blockId, (System.nanoTime - start) / 1e6, data.limit()))
866+
}
867+
if (peersReplicatedTo.size < numPeersToReplicateTo) {
868+
logError(s"Replicated $blockId of ${data.limit()} bytes to only " +
869+
s"${peersReplicatedTo.size} peer(s) instead of ${numPeersToReplicateTo} " +
870+
s"in ${(System.nanoTime - startTime) / 1e6} ms")
871+
} else {
872+
logDebug(s"Successfully replicated $blockId of ${data.limit()} bytes to " +
873+
s"${peersReplicatedTo.size} peer(s) in ${(System.nanoTime - startTime) / 1e6} ms")
840874
}
841875
}
842876

core/src/test/resources/log4j.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#
1717

1818
# Set everything to be logged to the file core/target/unit-tests.log
19-
log4j.rootCategory=INFO, file
19+
log4j.rootCategory=DEBUG, file
2020
log4j.appender.file=org.apache.log4j.FileAppender
2121
log4j.appender.file.append=false
2222
log4j.appender.file.file=target/unit-tests.log

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,8 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
9191
conf.set("spark.driver.port", boundPort.toString)
9292
conf.set("spark.storage.unrollFraction", "0.4")
9393
conf.set("spark.storage.unrollMemoryThreshold", "512")
94-
94+
conf.set("spark.core.connection.ack.wait.timeout", "1")
95+
conf.set("spark.storage.cachedPeersTtl", "10")
9596
master = new BlockManagerMaster(
9697
actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
9798
conf, true)
@@ -1300,59 +1301,55 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
13001301
testReplication(5, storageLevels)
13011302
}
13021303

1303-
test("block replication with addition and removal of executors") {
1304+
test("block replication with addition and deletion of executors") {
13041305
val blockSize = 1000
13051306
val storeSize = 10000
13061307
val allStores = new ArrayBuffer[BlockManager]()
13071308

1309+
13081310
try {
13091311
val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
13101312
allStores ++= initialStores
13111313

1312-
// 2x replication works
1313-
initialStores(0).putSingle("a1", new Array[Byte](blockSize), StorageLevel.MEMORY_AND_DISK_2)
1314-
assert(master.getLocations("a1").size === 2)
1314+
def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) {
1315+
initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
1316+
assert(master.getLocations(blockId).size === expectedNumLocations)
1317+
master.removeBlock(blockId)
1318+
}
13151319

1316-
// 3x replication should only replicate 2x
1317-
initialStores(0).putSingle(
1318-
"a2", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
1319-
assert(master.getLocations("a2").size === 2)
1320+
// 2x replication should work, 3x replication should only replicate 2x
1321+
testPut("a1", StorageLevel.MEMORY_AND_DISK_2, 2)
1322+
testPut("a2", StorageLevel(true, true, false, true, 3), 2)
13201323

1324+
// Add another store, 3x replication should work now, 4x replication should only replicate 3x
13211325
val newStore1 = makeBlockManager(storeSize, s"newstore1")
13221326
allStores += newStore1
1327+
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
1328+
testPut("a3", StorageLevel(true, true, false, true, 3), 3)
1329+
}
1330+
testPut("a4",StorageLevel(true, true, false, true, 4), 3)
13231331

1324-
// 3x replication should work now
1325-
initialStores(0).putSingle(
1326-
"a3", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
1327-
assert(master.getLocations("a3").size === 3)
1328-
1329-
// 4x replication should only replicate 3x
1330-
initialStores(0).putSingle(
1331-
"a4", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4))
1332-
assert(master.getLocations("a4").size === 3)
1333-
1332+
// Add another store, 4x replication should work now
13341333
val newStore2 = makeBlockManager(storeSize, s"newstore2")
13351334
allStores += newStore2
1335+
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
1336+
testPut("a5", StorageLevel(true, true, false, true, 4), 4)
1337+
}
13361338

1337-
// 4x replication should work now
1338-
initialStores(0).putSingle(
1339-
"a5", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4))
1340-
assert(master.getLocations("a5").size === 4)
1341-
1342-
// Remove all the stores and add new stores
1343-
(initialStores ++ Seq(newStore1, newStore2)).map { store =>
1344-
store.blockManagerId.executorId
1345-
}.foreach { execId =>
1346-
master.removeExecutor(execId)
1339+
// Remove all but the 1st store, 2x replication should fail
1340+
(initialStores.slice(1, initialStores.size) ++ Seq(newStore1, newStore2)).foreach {
1341+
store =>
1342+
master.removeExecutor(store.blockManagerId.executorId)
1343+
store.stop()
13471344
}
1345+
testPut("a6", StorageLevel.MEMORY_AND_DISK_2, 1)
13481346

1349-
// Add new stores and test if replication works
1347+
// Add new stores, 3x replication should work
13501348
val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") }
13511349
allStores ++= newStores
1352-
1353-
newStores(0).putSingle(
1354-
"a6", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
1355-
assert(master.getLocations("a6").size === 3)
1350+
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
1351+
testPut("a7", StorageLevel(true, true, false, true, 3), 3)
1352+
}
13561353
} finally {
13571354
allStores.foreach { _.stop() }
13581355
}

0 commit comments

Comments
 (0)