Skip to content

Commit d081bf6

Browse files
committed
Fixed bug in get peers and unit tests to test get-peers and replication under executor churn.
1 parent 9f0ac9f commit d081bf6

File tree

5 files changed

+155
-41
lines changed

5 files changed

+155
-41
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import org.apache.spark.network._
3737
import org.apache.spark.serializer.Serializer
3838
import org.apache.spark.shuffle.ShuffleManager
3939
import org.apache.spark.util._
40+
import scala.collection.mutable
4041

4142

4243
private[spark] sealed trait BlockValues
@@ -111,7 +112,8 @@ private[spark] class BlockManager(
111112
MetadataCleanerType.BLOCK_MANAGER, this.dropOldNonBroadcastBlocks, conf)
112113
private val broadcastCleaner = new MetadataCleaner(
113114
MetadataCleanerType.BROADCAST_VARS, this.dropOldBroadcastBlocks, conf)
114-
115+
private val cachedPeers = new ArrayBuffer[BlockManagerId]
116+
private var lastPeerFetchTime = 0L
115117
initialize()
116118

117119
/* The compression codec to use. Note that the "lazy" val is necessary because we want to delay
@@ -786,20 +788,42 @@ private[spark] class BlockManager(
786788
updatedBlocks
787789
}
788790

791+
/**
792+
* Get peer block managers in the system.
793+
*/
794+
private def getPeers(numPeers: Int): Seq[BlockManagerId] = cachedPeers.synchronized {
795+
val currentTime = System.currentTimeMillis
796+
// If cache is empty or has insufficient number of peers, fetch from master
797+
if (cachedPeers.isEmpty || numPeers > cachedPeers.size) {
798+
cachedPeers.clear()
799+
cachedPeers ++= master.getPeers(blockManagerId)
800+
logDebug("Fetched peers from master: " + cachedPeers.mkString("[", ",", "]"))
801+
lastPeerFetchTime = System.currentTimeMillis
802+
}
803+
if (numPeers > cachedPeers.size) {
804+
// if enough peers cannot be provided, return all of them
805+
logDebug(s"Not enough peers - cached peers = ${cachedPeers.size}, required peers = $numPeers")
806+
cachedPeers
807+
} else {
808+
cachedPeers.take(numPeers)
809+
}
810+
}
811+
789812
/**
790813
* Replicate block to another node.
791814
*/
792-
@volatile var cachedPeers: Seq[BlockManagerId] = null
793815
private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = {
794816
val tLevel = StorageLevel(
795817
level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1)
796-
if (cachedPeers == null) {
797-
cachedPeers = master.getPeers(blockManagerId, level.replication - 1)
818+
val selectedPeers = getPeers(level.replication - 1)
819+
if (selectedPeers.size < level.replication - 1) {
820+
logWarning(s"Failed to replicate block to ${level.replication - 1} peer(s) " +
821+
s"as only ${selectedPeers.size} peer(s) were found")
798822
}
799-
for (peer: BlockManagerId <- cachedPeers) {
823+
selectedPeers.foreach { peer =>
800824
val start = System.nanoTime
801825
data.rewind()
802-
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} Bytes. " +
826+
logDebug(s"Try to replicate $blockId once; The size of the data is ${data.limit()} bytes. " +
803827
s"To node: $peer")
804828

805829
try {
@@ -808,6 +832,7 @@ private[spark] class BlockManager(
808832
} catch {
809833
case e: Exception =>
810834
logError(s"Failed to replicate block to $peer", e)
835+
cachedPeers.synchronized { cachedPeers.clear() }
811836
}
812837

813838
logDebug("Replicating BlockId %s once used %fs; The size of the data is %d bytes."

core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,8 @@ class BlockManagerMaster(
8484
}
8585

8686
/** Get ids of other nodes in the cluster from the driver */
87-
def getPeers(blockManagerId: BlockManagerId, numPeers: Int): Seq[BlockManagerId] = {
88-
val result = askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId, numPeers))
89-
if (result.length != numPeers) {
90-
throw new SparkException(
91-
"Error getting peers, only got " + result.size + " instead of " + numPeers)
92-
}
93-
result
87+
def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
88+
askDriverWithReply[Seq[BlockManagerId]](GetPeers(blockManagerId))
9489
}
9590

9691
/**

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
8383
case GetLocationsMultipleBlockIds(blockIds) =>
8484
sender ! getLocationsMultipleBlockIds(blockIds)
8585

86-
case GetPeers(blockManagerId, size) =>
87-
sender ! getPeers(blockManagerId, size)
86+
case GetPeers(blockManagerId) =>
87+
sender ! getPeers(blockManagerId)
8888

8989
case GetMemoryStatus =>
9090
sender ! memoryStatus
@@ -403,16 +403,20 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
403403
blockIds.map(blockId => getLocations(blockId))
404404
}
405405

406-
private def getPeers(blockManagerId: BlockManagerId, size: Int): Seq[BlockManagerId] = {
407-
val peers: Array[BlockManagerId] = blockManagerInfo.keySet.toArray
408-
409-
val selfIndex = peers.indexOf(blockManagerId)
406+
/** Get the list of the peers of the given block manager */
407+
private def getPeers(blockManagerId: BlockManagerId): Seq[BlockManagerId] = {
408+
val blockManagerIds = blockManagerInfo.keySet.toArray
409+
val selfIndex = blockManagerIds.indexOf(blockManagerId)
410410
if (selfIndex == -1) {
411-
throw new SparkException("Self index for " + blockManagerId + " not found")
411+
logError("Self index for " + blockManagerId + " not found")
412+
Seq.empty
413+
} else {
414+
// If the blockManagerIds is [ id1 id2 id3 id4 id5 ] and the blockManagerId is id2
415+
// Then this code will return the list [ id3 id4 id5 id1 ]
416+
Array.tabulate[BlockManagerId](blockManagerIds.size - 1) { i =>
417+
blockManagerIds((selfIndex + i + 1) % blockManagerIds.size)
418+
}
412419
}
413-
414-
// Note that this logic will select the same node multiple times if there aren't enough peers
415-
Array.tabulate[BlockManagerId](size) { i => peers((selfIndex + i + 1) % peers.length) }.toSeq
416420
}
417421
}
418422

core/src/main/scala/org/apache/spark/storage/BlockManagerMessages.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ private[spark] object BlockManagerMessages {
8888

8989
case class GetLocationsMultipleBlockIds(blockIds: Array[BlockId]) extends ToBlockManagerMaster
9090

91-
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
91+
case class GetPeers(blockManagerId: BlockManagerId) extends ToBlockManagerMaster
9292

9393
case class RemoveExecutor(execId: String) extends ToBlockManagerMaster
9494

core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala

Lines changed: 107 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
4949
import org.apache.spark.util.{AkkaUtils, ByteBufferInputStream, SizeEstimator, Utils}
5050
import org.apache.spark.storage.StorageLevel._
5151
import org.apache.spark.storage.BlockManagerMessages.BlockManagerHeartbeat
52-
import org.apache.spark.storage.BroadcastBlockId
53-
import org.apache.spark.storage.RDDBlockId
54-
import org.apache.spark.storage.ShuffleBlockId
55-
import org.apache.spark.storage.TestBlockId
5652

5753

5854
class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
@@ -126,7 +122,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
126122

127123
System.clearProperty("spark.test.useCompressedOops")
128124
}
129-
/*
125+
130126
test("StorageLevel object caching") {
131127
val level1 = StorageLevel(false, false, false, false, 3)
132128
val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1
@@ -195,7 +191,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
195191
store = makeBlockManager(2000, "exec1")
196192
store2 = makeBlockManager(2000, "exec2")
197193

198-
val peers = master.getPeers(store.blockManagerId, 1)
194+
val peers = master.getPeers(store.blockManagerId)
199195
assert(peers.size === 1, "master did not return the other manager as a peer")
200196
assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
201197

@@ -454,7 +450,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
454450
val list2DiskGet = store.get("list2disk")
455451
assert(list2DiskGet.isDefined, "list2memory expected to be in store")
456452
assert(list2DiskGet.get.data.size === 3)
457-
System.out.println(list2DiskGet)
458453
// We don't know the exact size of the data on disk, but it should certainly be > 0.
459454
assert(list2DiskGet.get.inputMetrics.bytesRead > 0)
460455
assert(list2DiskGet.get.inputMetrics.readMethod === DataReadMethod.Disk)
@@ -1234,14 +1229,52 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12341229
assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
12351230
assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
12361231
}
1237-
*/
1232+
1233+
test("get peers with store addition and removal") {
1234+
val numStores = 4
1235+
val stores = (1 to numStores - 1).map { i => makeBlockManager(1000, s"store$i") }
1236+
try {
1237+
val storeIds = stores.map { _.blockManagerId }.toSet
1238+
assert(master.getPeers(stores(0).blockManagerId).toSet ===
1239+
storeIds.filterNot { _ == stores(0).blockManagerId })
1240+
assert(master.getPeers(stores(1).blockManagerId).toSet ===
1241+
storeIds.filterNot { _ == stores(1).blockManagerId })
1242+
assert(master.getPeers(stores(2).blockManagerId).toSet ===
1243+
storeIds.filterNot { _ == stores(2).blockManagerId })
1244+
1245+
// Add a new store and test whether get peers returns it
1246+
val newStore = makeBlockManager(1000, s"store$numStores")
1247+
assert(master.getPeers(stores(0).blockManagerId).toSet ===
1248+
storeIds.filterNot { _ == stores(0).blockManagerId } + newStore.blockManagerId)
1249+
assert(master.getPeers(stores(1).blockManagerId).toSet ===
1250+
storeIds.filterNot { _ == stores(1).blockManagerId } + newStore.blockManagerId)
1251+
assert(master.getPeers(stores(2).blockManagerId).toSet ===
1252+
storeIds.filterNot { _ == stores(2).blockManagerId } + newStore.blockManagerId)
1253+
assert(master.getPeers(newStore.blockManagerId).toSet === storeIds)
1254+
1255+
// Remove a store and test whether get peers returns it
1256+
val storeIdToRemove = stores(0).blockManagerId
1257+
master.removeExecutor(storeIdToRemove.executorId)
1258+
assert(!master.getPeers(stores(1).blockManagerId).contains(storeIdToRemove))
1259+
assert(!master.getPeers(stores(2).blockManagerId).contains(storeIdToRemove))
1260+
assert(!master.getPeers(newStore.blockManagerId).contains(storeIdToRemove))
1261+
1262+
// Test whether asking for peers of a unregistered block manager id returns empty list
1263+
assert(master.getPeers(stores(0).blockManagerId).isEmpty)
1264+
assert(master.getPeers(BlockManagerId("", "", 1)).isEmpty)
1265+
} finally {
1266+
stores.foreach { _.stop() }
1267+
}
1268+
}
1269+
12381270
test("block replication - 2x") {
12391271
testReplication(2,
12401272
Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK_2, MEMORY_AND_DISK_SER_2)
12411273
)
12421274
}
12431275

12441276
test("block replication - 3x") {
1277+
// Generate storage levels with 3x replication
12451278
val storageLevels = {
12461279
Seq(MEMORY_ONLY, MEMORY_ONLY_SER, DISK_ONLY, MEMORY_AND_DISK, MEMORY_AND_DISK_SER).map {
12471280
level => StorageLevel(
@@ -1252,18 +1285,77 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12521285
}
12531286

12541287
test("block replication - mixed between 1x to 5x") {
1288+
// Generate storage levels with varying replication
12551289
val storageLevels = Seq(
12561290
MEMORY_ONLY,
12571291
MEMORY_ONLY_SER_2,
1258-
StorageLevel(true, true, false, true, 3),
1259-
StorageLevel(true, false, false, false, 4),
1260-
StorageLevel(false, false, false, false, 5),
1261-
StorageLevel(true, false, false, false, 4),
1262-
StorageLevel(true, true, false, true, 3),
1292+
StorageLevel(true, false, false, false, 3),
1293+
StorageLevel(true, true, false, true, 4),
1294+
StorageLevel(true, true, false, false, 5),
1295+
StorageLevel(true, true, false, true, 4),
1296+
StorageLevel(true, false, false, false, 3),
12631297
MEMORY_ONLY_SER_2,
12641298
MEMORY_ONLY
12651299
)
1266-
testReplication(3, storageLevels)
1300+
testReplication(5, storageLevels)
1301+
}
1302+
1303+
test("block replication with addition and removal of executors") {
1304+
val blockSize = 1000
1305+
val storeSize = 10000
1306+
val allStores = new ArrayBuffer[BlockManager]()
1307+
1308+
try {
1309+
val initialStores = (1 to 2).map { i => makeBlockManager(storeSize, s"store$i") }
1310+
allStores ++= initialStores
1311+
1312+
// 2x replication works
1313+
initialStores(0).putSingle("a1", new Array[Byte](blockSize), StorageLevel.MEMORY_AND_DISK_2)
1314+
assert(master.getLocations("a1").size === 2)
1315+
1316+
// 3x replication should only replicate 2x
1317+
initialStores(0).putSingle(
1318+
"a2", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
1319+
assert(master.getLocations("a2").size === 2)
1320+
1321+
val newStore1 = makeBlockManager(storeSize, s"newstore1")
1322+
allStores += newStore1
1323+
1324+
// 3x replication should work now
1325+
initialStores(0).putSingle(
1326+
"a3", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
1327+
assert(master.getLocations("a3").size === 3)
1328+
1329+
// 4x replication should only replicate 3x
1330+
initialStores(0).putSingle(
1331+
"a4", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4))
1332+
assert(master.getLocations("a4").size === 3)
1333+
1334+
val newStore2 = makeBlockManager(storeSize, s"newstore2")
1335+
allStores += newStore2
1336+
1337+
// 4x replication should work now
1338+
initialStores(0).putSingle(
1339+
"a5", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 4))
1340+
assert(master.getLocations("a5").size === 4)
1341+
1342+
// Remove all the stores and add new stores
1343+
(initialStores ++ Seq(newStore1, newStore2)).map { store =>
1344+
store.blockManagerId.executorId
1345+
}.foreach { execId =>
1346+
master.removeExecutor(execId)
1347+
}
1348+
1349+
// Add new stores and test if replication works
1350+
val newStores = (3 to 5).map { i => makeBlockManager(storeSize, s"newstore$i") }
1351+
allStores ++= newStores
1352+
1353+
newStores(0).putSingle(
1354+
"a6", new Array[Byte](blockSize), StorageLevel(true, true, false, true, 3))
1355+
assert(master.getLocations("a6").size === 3)
1356+
} finally {
1357+
allStores.foreach { _.stop() }
1358+
}
12671359
}
12681360

12691361
/**
@@ -1273,7 +1365,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12731365
* is correct. Then it also drops the block from memory of each store (using LRU) and
12741366
* again checks whether the master's knowledge gets updated.
12751367
*/
1276-
def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
1368+
private def testReplication(maxReplication: Int, storageLevels: Seq[StorageLevel]) {
12771369
import org.apache.spark.storage.StorageLevel._
12781370

12791371
assert(maxReplication > 1,
@@ -1291,7 +1383,6 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12911383

12921384
try {
12931385
storageLevels.foreach {storageLevel =>
1294-
12951386
// Put the block into one of the stores
12961387
val blockId = new TestBlockId(
12971388
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
@@ -1364,4 +1455,3 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
13641455
}
13651456
}
13661457
}
1367-

0 commit comments

Comments
 (0)