@@ -47,6 +47,12 @@ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
4747import org .apache .spark .shuffle .hash .HashShuffleManager
4848import org .apache .spark .storage .BlockManagerMessages .BlockManagerHeartbeat
4949import org .apache .spark .util .{AkkaUtils , ByteBufferInputStream , SizeEstimator , Utils }
50+ import org .apache .spark .storage .StorageLevel ._
51+ import org .apache .spark .storage .BlockManagerMessages .BlockManagerHeartbeat
52+ import org .apache .spark .storage .BroadcastBlockId
53+ import org .apache .spark .storage .RDDBlockId
54+ import org .apache .spark .storage .ShuffleBlockId
55+ import org .apache .spark .storage .TestBlockId
5056
5157
5258class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
@@ -120,7 +126,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
120126
121127 System .clearProperty(" spark.test.useCompressedOops" )
122128 }
123-
129+ /*
124130 test("StorageLevel object caching") {
125131 val level1 = StorageLevel(false, false, false, false, 3)
126132 val level2 = StorageLevel(false, false, false, false, 3) // this should return the same object as level1
@@ -1228,17 +1234,36 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12281234 assert(unrollMemoryAfterB6 === unrollMemoryAfterB4)
12291235 assert(unrollMemoryAfterB7 === unrollMemoryAfterB4)
12301236 }
1231-
1237+ */
12321238 test(" block replication - 2x" ) {
1233- testReplication(2 )
1239+ testReplication(2 ,
1240+ Seq (MEMORY_ONLY , MEMORY_ONLY_SER , DISK_ONLY , MEMORY_AND_DISK_2 , MEMORY_AND_DISK_SER_2 )
1241+ )
12341242 }
12351243
12361244 test(" block replication - 3x" ) {
1237- testReplication(3 )
1245+ val storageLevels = {
1246+ Seq (MEMORY_ONLY , MEMORY_ONLY_SER , DISK_ONLY , MEMORY_AND_DISK , MEMORY_AND_DISK_SER ).map {
1247+ level => StorageLevel (
1248+ level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 3 )
1249+ }
1250+ }
1251+ testReplication(3 , storageLevels)
12381252 }
12391253
1240- test(" block replication - 4x" ) {
1241- testReplication(4 )
1254+ test(" block replication - mixed between 1x to 5x" ) {
1255+ val storageLevels = Seq (
1256+ MEMORY_ONLY ,
1257+ MEMORY_ONLY_SER_2 ,
1258+ StorageLevel (true , true , false , true , 3 ),
1259+ StorageLevel (true , false , false , false , 4 ),
1260+ StorageLevel (false , false , false , false , 5 ),
1261+ StorageLevel (true , false , false , false , 4 ),
1262+ StorageLevel (true , true , false , true , 3 ),
1263+ MEMORY_ONLY_SER_2 ,
1264+ MEMORY_ONLY
1265+ )
1266+ testReplication(3 , storageLevels)
12421267 }
12431268
12441269 /**
@@ -1248,25 +1273,19 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12481273 * is correct. Then it also drops the block from memory of each store (using LRU) and
12491274 * again checks whether the master's knowledge gets updated.
12501275 */
1251- def testReplication (replicationFactor : Int ) {
1276+ def testReplication (maxReplication : Int , storageLevels : Seq [ StorageLevel ] ) {
12521277 import org .apache .spark .storage .StorageLevel ._
12531278
1254- assert(replicationFactor > 1 ,
1255- s " ReplicationTester cannot test replication factor $replicationFactor " )
1279+ assert(maxReplication > 1 ,
1280+ s " Cannot test replication factor $maxReplication " )
12561281
12571282 // storage levels to test with the given replication factor
1258- val storageLevels = {
1259- Seq (MEMORY_ONLY , MEMORY_ONLY_SER , DISK_ONLY , MEMORY_AND_DISK , MEMORY_AND_DISK_SER ).map {
1260- level => StorageLevel (
1261- level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, replicationFactor)
1262- }
1263- }
12641283
12651284 val storeSize = 10000
12661285 val blockSize = 1000
12671286
12681287 // As many stores as the replication factor
1269- val stores = (1 to replicationFactor ).map {
1288+ val stores = (1 to maxReplication ).map {
12701289 i => makeBlockManager(storeSize, s " store $i" )
12711290 }
12721291
@@ -1279,11 +1298,13 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
12791298 stores(0 ).putSingle(blockId, new Array [Byte ](blockSize), storageLevel)
12801299
12811300 // Assert that master know two locations for the block
1282- assert(master.getLocations(blockId).size === replicationFactor,
1283- s " master did not have $replicationFactor locations for $blockId" )
1301+ val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
1302+ assert(blockLocations.size === storageLevel.replication,
1303+ s " master did not have ${storageLevel.replication} locations for $blockId" )
12841304
1285- // Test state of the store for the block
1286- stores.foreach { testStore =>
1305+ // Test state of the stores that contain the block
1306+ stores.filter(testStore => blockLocations.contains(testStore.blockManagerId.executorId))
1307+ .foreach { testStore =>
12871308 val testStoreName = testStore.blockManagerId.executorId
12881309 assert(testStore.getLocal(blockId).isDefined, s " $blockId was not found in $testStoreName" )
12891310 assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
0 commit comments