@@ -26,7 +26,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
2626import org .apache .spark .storage .{BlockId , BlockManager , StorageLevel , StreamBlockId }
2727import org .apache .spark .streaming .util .{FileBasedWriteAheadLogSegment , FileBasedWriteAheadLogWriter }
2828import org .apache .spark .util .Utils
29- import org .apache .spark .{SparkConf , SparkContext }
29+ import org .apache .spark .{SparkConf , SparkContext , SparkException }
3030
3131class WriteAheadLogBackedBlockRDDSuite
3232 extends FunSuite with BeforeAndAfterAll with BeforeAndAfterEach {
@@ -60,48 +60,88 @@ class WriteAheadLogBackedBlockRDDSuite
6060 System .clearProperty(" spark.driver.port" )
6161 }
6262
63- test(" Read data available in block manager and write ahead log" ) {
64- testRDD(5 , 5 )
63+ test(" Read data available in both block manager and write ahead log" ) {
64+ testRDD(numPartitions = 5 , numPartitionsInBM = 5 , numPartitionsInWAL = 5 )
6565 }
6666
6767 test(" Read data available only in block manager, not in write ahead log" ) {
68- testRDD(5 , 0 )
68+ testRDD(numPartitions = 5 , numPartitionsInBM = 5 , numPartitionsInWAL = 0 )
6969 }
7070
7171 test(" Read data available only in write ahead log, not in block manager" ) {
72- testRDD(0 , 5 )
72+ testRDD(numPartitions = 5 , numPartitionsInBM = 0 , numPartitionsInWAL = 5 )
7373 }
7474
75- test(" Read data available only in write ahead log , and test storing in block manager " ) {
76- testRDD(0 , 5 , testStoreInBM = true )
75+ test(" Read data with partially available in block manager , and rest in write ahead log " ) {
76+ testRDD(numPartitions = 5 , numPartitionsInBM = 3 , numPartitionsInWAL = 2 )
7777 }
7878
79- test(" Read data with partially available in block manager, and rest in write ahead log" ) {
80- testRDD(3 , 2 )
79+ test(" Test isBlockValid skips block fetching from BlockManager" ) {
80+ testRDD(
81+ numPartitions = 5 , numPartitionsInBM = 5 , numPartitionsInWAL = 0 , testIsBlockValid = true )
82+ }
83+
84+ test(" Test whether RDD is valid after removing blocks from block manager" ) {
85+ testRDD(
86+ numPartitions = 5 , numPartitionsInBM = 5 , numPartitionsInWAL = 5 , testBlockRemove = true )
87+ }
88+
89+ test(" Test storing of blocks recovered from write ahead log back into block manager" ) {
90+ testRDD(
91+ numPartitions = 5 , numPartitionsInBM = 0 , numPartitionsInWAL = 5 , testStoreInBM = true )
8192 }
8293
8394 /**
8495 * Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
8596 * and the rest to a write ahead log, and then reading reading it all back using the RDD.
8697 * It can also test if the partitions that were read from the log were again stored in
8798 * block manager.
88- * @param numPartitionsInBM Number of partitions to write to the Block Manager
89- * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
90- * @param testStoreInBM Test whether blocks read from log are stored back into block manager
99+ *
100+ *
101+ *
102+ * @param numPartitions Number of partitions in RDD
103+ * @param numPartitionsInBM Number of partitions to write to the BlockManager.
104+ * Partitions 0 to (numPartitionsInBM-1) will be written to BlockManager
105+ * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log.
106+ * Partitions (numPartitions - 1 - numPartitionsInWAL) to
107+ * (numPartitions - 1) will be written to WAL
108+ * @param testIsBlockValid Test whether setting isBlockValid to false skips block fetching
109+ * @param testBlockRemove Test whether calling rdd.removeBlock() makes the RDD still usable with
110+ * reads falling back to the WAL
111+ * @param testStoreInBM Test whether blocks read from log are stored back into block manager
112+ *
113+ * Example with numPartitions = 5, numPartitionsInBM = 3, and numPartitionsInWAL = 4
114+ *
115+ * numPartitionsInBM = 3
116+ * |------------------|
117+ * | |
118+ * 0 1 2 3 4
119+ * | |
120+ * |-------------------------|
121+ * numPartitionsInWAL = 4
91122 */
92123 private def testRDD (
93- numPartitionsInBM : Int , numPartitionsInWAL : Int , testStoreInBM : Boolean = false ) {
94- val numBlocks = numPartitionsInBM + numPartitionsInWAL
95- val data = Seq .fill(numBlocks, 10 )(scala.util.Random .nextString(50 ))
124+ numPartitions : Int ,
125+ numPartitionsInBM : Int ,
126+ numPartitionsInWAL : Int ,
127+ testIsBlockValid : Boolean = false ,
128+ testBlockRemove : Boolean = false ,
129+ testStoreInBM : Boolean = false
130+ ) {
131+ require(numPartitionsInBM <= numPartitions,
132+ " Can't put more partitions in BlockManager than that in RDD" )
133+ require(numPartitionsInWAL <= numPartitions,
134+ " Can't put more partitions in write ahead log than that in RDD" )
135+ val data = Seq .fill(numPartitions, 10 )(scala.util.Random .nextString(50 ))
96136
97137 // Put the necessary blocks in the block manager
98- val blockIds = Array .fill(numBlocks )(StreamBlockId (Random .nextInt(), Random .nextInt()))
138+ val blockIds = Array .fill(numPartitions )(StreamBlockId (Random .nextInt(), Random .nextInt()))
99139 data.zip(blockIds).take(numPartitionsInBM).foreach { case (block, blockId) =>
100140 blockManager.putIterator(blockId, block.iterator, StorageLevel .MEMORY_ONLY_SER )
101141 }
102142
103143 // Generate write ahead log record handles
104- val recordHandles = generateFakeRecordHandles(numPartitionsInBM ) ++
144+ val recordHandles = generateFakeRecordHandles(numPartitions - numPartitionsInWAL ) ++
105145 generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
106146 blockIds.takeRight(numPartitionsInWAL))
107147
@@ -111,7 +151,7 @@ class WriteAheadLogBackedBlockRDDSuite
111151 " Expected blocks not in BlockManager"
112152 )
113153 require(
114- blockIds.takeRight(numPartitionsInWAL ).forall(blockManager.get(_).isEmpty),
154+ blockIds.takeRight(numPartitions - numPartitionsInBM ).forall(blockManager.get(_).isEmpty),
115155 " Unexpected blocks in BlockManager"
116156 )
117157
@@ -122,16 +162,39 @@ class WriteAheadLogBackedBlockRDDSuite
122162 " Expected blocks not in write ahead log"
123163 )
124164 require(
125- recordHandles.take(numPartitionsInBM ).forall(s =>
165+ recordHandles.take(numPartitions - numPartitionsInWAL ).forall(s =>
126166 ! new File (s.path.stripPrefix(" file://" )).exists()),
127167 " Unexpected blocks in write ahead log"
128168 )
129169
130170 // Create the RDD and verify whether the returned data is correct
131171 val rdd = new WriteAheadLogBackedBlockRDD [String ](sparkContext, blockIds.toArray,
132- recordHandles.toArray, storeInBlockManager = false , storageLevel = StorageLevel . MEMORY_ONLY )
172+ recordHandles.toArray, storeInBlockManager = false )
133173 assert(rdd.collect() === data.flatten)
134174
175+ // Verify that the block fetching is skipped when isBlockValid is set to false.
176+ // This is done by using a RDD whose data is only in memory but is set to skip block fetching
177+ // Using that RDD will throw exception, as it skips block fetching even if the blocks are in
178+ // in BlockManager.
179+ if (testIsBlockValid) {
180+ require(numPartitionsInBM === numPartitions, " All partitions must be in BlockManager" )
181+ require(numPartitionsInWAL === 0 , " No partitions must be in WAL" )
182+ val rdd2 = new WriteAheadLogBackedBlockRDD [String ](sparkContext, blockIds.toArray,
183+ recordHandles.toArray, isBlockIdValid = Array .fill(blockIds.length)(false ))
184+ intercept[SparkException ] {
185+ rdd2.collect()
186+ }
187+ }
188+
189+ // Verify that the RDD is not invalid after the blocks are removed and can still read data
190+ // from write ahead log
191+ if (testBlockRemove) {
192+ require(numPartitions === numPartitionsInWAL, " All partitions must be in WAL for this test" )
193+ require(numPartitionsInBM > 0 , " Some partitions must be in BlockManager for this test" )
194+ rdd.removeBlocks()
195+ assert(rdd.collect() === data.flatten)
196+ }
197+
135198 if (testStoreInBM) {
136199 val rdd2 = new WriteAheadLogBackedBlockRDD [String ](sparkContext, blockIds.toArray,
137200 recordHandles.toArray, storeInBlockManager = true , storageLevel = StorageLevel .MEMORY_ONLY )
0 commit comments