Skip to content

Commit 08afaa9

Browse files
committed
Made peer selection for replication deterministic to block id
1 parent 3821ab9 commit 08afaa9

File tree

2 files changed

+45
-7
lines changed

2 files changed

+45
-7
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,7 @@ private[spark] class BlockManager(
798798
cachedPeers.synchronized {
799799
if (cachedPeers.isEmpty || forceFetch || timeout) {
800800
cachedPeers.clear()
801-
cachedPeers ++= master.getPeers(blockManagerId)
801+
cachedPeers ++= master.getPeers(blockManagerId).sortBy(_.hashCode)
802802
lastPeerFetchTime = System.currentTimeMillis
803803
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
804804
}
@@ -817,6 +817,7 @@ private[spark] class BlockManager(
817817
val tLevel = StorageLevel(
818818
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
819819
val startTime = System.nanoTime
820+
val random = new Random(blockId.hashCode)
820821

821822
var forceFetchPeers = false
822823
var failures = 0
@@ -825,7 +826,7 @@ private[spark] class BlockManager(
825826
// Get a random peer
826827
def getRandomPeer(): Option[BlockManagerId] = {
827828
val peers = getPeers(forceFetchPeers) -- peersReplicatedTo -- peersFailedToReplicateTo
828-
if (!peers.isEmpty) Some(peers.toSeq(Random.nextInt(peers.size))) else None
829+
if (!peers.isEmpty) Some(peers.toSeq(random.nextInt(peers.size))) else None
829830
}
830831

831832
// One by one choose a random peer and try uploading the block to it

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,13 +1283,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12831283
assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
12841284
}
12851285

1286-
test("block replication - 2x") {
1286+
test("block replication - 2x replication") {
12871287
testReplication(2,
12881288
Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2)
12891289
)
12901290
}
12911291

1292-
test("block replication - 3x") {
1292+
test("block replication - 3x replication") {
12931293
// Generate storage levels with 3x replication
12941294
val storageLevels = {
12951295
Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map {
@@ -1316,7 +1316,44 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
13161316
testReplication(5, storageLevels)
13171317
}
13181318

1319-
test("block replication with addition and deletion of block managers") {
1319+
test("block replication - deterministic node selection") {
1320+
val blockSize = 1000
1321+
val storeSize = 10000
1322+
val stores = (1 to 5).map { i => makeBlockManager(storeSize, s"store$i") }
1323+
val storageLevel2x = StorageLevel.MEMORY_AND_DISK_2
1324+
val storageLevel3x = StorageLevel(true, true, false, true, 3)
1325+
val storageLevel4x = StorageLevel(true, true, false, true, 4)
1326+
1327+
def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
1328+
stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
1329+
val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
1330+
stores.foreach { _.removeBlock(blockId) }
1331+
master.removeBlock(blockId)
1332+
locations
1333+
}
1334+
1335+
val a1Locs = putBlockAndGetLocations("a1", storageLevel2x)
1336+
assert(putBlockAndGetLocations("a1", storageLevel2x) === a1Locs,
1337+
"Inserting a 2x replicated block second time gave different locations from the first")
1338+
1339+
val a2Locs3x = putBlockAndGetLocations("a2", storageLevel3x)
1340+
assert(putBlockAndGetLocations("a2", storageLevel3x) === a2Locs3x,
1341+
"Inserting a 3x replicated block second time gave different locations from the first")
1342+
val a2Locs2x = putBlockAndGetLocations("a2", storageLevel2x)
1343+
assert(
1344+
a2Locs2x.subsetOf(a2Locs3x),
1345+
"Inserting a with 2x replication gave locations that are not a subset of locations" +
1346+
s" with 3x replication [3x: ${a2Locs3x.mkString(",")}; 2x: ${a2Locs2x.mkString(",")}"
1347+
)
1348+
val a2Locs4x = putBlockAndGetLocations("a2", storageLevel4x)
1349+
assert(
1350+
a2Locs3x.subsetOf(a2Locs4x),
1351+
"Inserting a with 4x replication gave locations that are not a superset of locations " +
1352+
s"with 3x replication [3x: ${a2Locs3x.mkString(",")}; 4x: ${a2Locs4x.mkString(",")}"
1353+
)
1354+
}
1355+
1356+
test("block replication - addition and deletion of block managers") {
13201357
val blockSize = 1000
13211358
val storeSize = 10000
13221359
val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
@@ -1329,10 +1366,10 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
13291366
*/
13301367
def testPut(blockId: String, storageLevel: StorageLevel, expectedNumLocations: Int) {
13311368
try {
1332-
initialStores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
1369+
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
13331370
assert(master.getLocations(blockId).size === expectedNumLocations)
13341371
} finally {
1335-
master.removeBlock(blockId)
1372+
allStores.foreach { _.removeBlock(blockId) }
13361373
}
13371374
}
13381375

0 commit comments

Comments
 (0)