Skip to content

Commit 89aaf9b

Browse files
committed
Filtered out non-existent blocks before creating BlockRDD
1 parent 053d94f commit 89aaf9b

File tree

3 files changed

+154
-2
lines changed

3 files changed

+154
-2
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
116116
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
117117
}
118118
}
119-
new BlockRDD[T](ssc.sc, blockIds)
119+
val validBlockIds = blockIds.filter { id =>
120+
ssc.sparkContext.env.blockManager.master.contains(id)
121+
}
122+
new BlockRDD[T](ssc.sc, validBlockIds)
120123
}
121124
} else {
122125
// If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD

streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ private[streaming]
7575
class WriteAheadLogBackedBlockRDD[T: ClassTag](
7676
@transient sc: SparkContext,
7777
@transient blockIds: Array[BlockId],
78-
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
78+
@transient val walRecordHandles: Array[WriteAheadLogRecordHandle],
7979
@transient isBlockIdValid: Array[Boolean] = Array.empty,
8080
storeInBlockManager: Boolean = false,
8181
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming
19+
20+
import scala.util.Random
21+
22+
import org.scalatest.BeforeAndAfterAll
23+
24+
import org.apache.spark.rdd.BlockRDD
25+
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
26+
import org.apache.spark.streaming.dstream.ReceiverInputDStream
27+
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
28+
import org.apache.spark.streaming.receiver.{BlockManagerBasedStoreResult, Receiver, WriteAheadLogBasedStoreResult}
29+
import org.apache.spark.streaming.scheduler.ReceivedBlockInfo
30+
import org.apache.spark.streaming.util.{WriteAheadLogRecordHandle, WriteAheadLogUtils}
31+
import org.apache.spark.{SparkConf, SparkEnv}
32+
33+
class ReceiverInputDStreamSuite extends TestSuiteBase with BeforeAndAfterAll {
34+
35+
override def afterAll(): Unit = {
36+
StreamingContext.getActive().map { _.stop() }
37+
}
38+
39+
40+
testWithoutWAL("createBlockRDD creates empty BlockRDD when no block info") { receiverStream =>
41+
val rdd = receiverStream.createBlockRDD(Time(0), Seq.empty)
42+
assert(rdd.isInstanceOf[BlockRDD[_]])
43+
assert(rdd.isEmpty())
44+
}
45+
46+
testWithoutWAL("createBlockRDD creates correct BlockRDD with block info") { receiverStream =>
47+
val blockInfos = Seq.fill(5) { createBlockInfo(withWALInfo = false) }
48+
val blockIds = blockInfos.map(_.blockId)
49+
50+
// Verify that there are some blocks that are present, and some that are not
51+
require(blockIds.forall(blockId => SparkEnv.get.blockManager.master.contains(blockId)))
52+
53+
val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
54+
assert(rdd.isInstanceOf[BlockRDD[_]])
55+
val blockRDD = rdd.asInstanceOf[BlockRDD[_]]
56+
assert(blockRDD.blockIds.toSeq === blockIds)
57+
}
58+
59+
testWithoutWAL("createBlockRDD filters non-existent blocks before creating BlockRDD") {
60+
receiverStream =>
61+
val presentBlockInfos = Seq.fill(2)(createBlockInfo(withWALInfo = false, createBlock = true))
62+
val absentBlockInfos = Seq.fill(3)(createBlockInfo(withWALInfo = false, createBlock = false))
63+
val blockInfos = presentBlockInfos ++ absentBlockInfos
64+
val blockIds = blockInfos.map(_.blockId)
65+
66+
// Verify that there are some blocks that are present, and some that are not
67+
require(blockIds.exists(blockId => SparkEnv.get.blockManager.master.contains(blockId)))
68+
require(blockIds.exists(blockId => !SparkEnv.get.blockManager.master.contains(blockId)))
69+
70+
val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
71+
assert(rdd.isInstanceOf[BlockRDD[_]])
72+
val blockRDD = rdd.asInstanceOf[BlockRDD[_]]
73+
assert(blockRDD.blockIds.toSeq === presentBlockInfos.map { _.blockId})
74+
}
75+
76+
testWithWAL("createBlockRDD creates empty WALBackedBlockRDD when no block info") {
77+
receiverStream =>
78+
val rdd = receiverStream.createBlockRDD(Time(0), Seq.empty)
79+
assert(rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
80+
assert(rdd.isEmpty())
81+
}
82+
83+
testWithWAL(
84+
"createBlockRDD creates correct WALBackedBlockRDD with all block info having WAL info") {
85+
receiverStream =>
86+
val blockInfos = Seq.fill(5) { createBlockInfo(withWALInfo = true) }
87+
val blockIds = blockInfos.map(_.blockId)
88+
val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
89+
assert(rdd.isInstanceOf[WriteAheadLogBackedBlockRDD[_]])
90+
val blockRDD = rdd.asInstanceOf[WriteAheadLogBackedBlockRDD[_]]
91+
assert(blockRDD.blockIds.toSeq === blockIds)
92+
assert(blockRDD.walRecordHandles.toSeq === blockInfos.map { _.walRecordHandleOption.get })
93+
}
94+
95+
testWithWAL("createBlockRDD creates BlockRDD when some block info dont have WAL info") {
96+
receiverStream =>
97+
val blockInfos1 = Seq.fill(2) { createBlockInfo(withWALInfo = true) }
98+
val blockInfos2 = Seq.fill(3) { createBlockInfo(withWALInfo = false) }
99+
val blockInfos = blockInfos1 ++ blockInfos2
100+
val blockIds = blockInfos.map(_.blockId)
101+
val rdd = receiverStream.createBlockRDD(Time(0), blockInfos)
102+
assert(rdd.isInstanceOf[BlockRDD[_]])
103+
val blockRDD = rdd.asInstanceOf[BlockRDD[_]]
104+
assert(blockRDD.blockIds.toSeq === blockIds)
105+
}
106+
107+
108+
private def testWithoutWAL(msg: String)(body: ReceiverInputDStream[_] => Unit): Unit = {
109+
test(s"Without WAL enabled: $msg") {
110+
runTest(enableWAL = false, body)
111+
}
112+
}
113+
114+
private def testWithWAL(msg: String)(body: ReceiverInputDStream[_] => Unit):Unit = {
115+
test(s"With WAL enabled: $msg") {
116+
runTest(enableWAL = true, body)
117+
}
118+
}
119+
120+
private def runTest(enableWAL: Boolean, body: ReceiverInputDStream[_] => Unit): Unit = {
121+
val conf = new SparkConf()
122+
conf.setMaster("local[4]").setAppName("ReceiverInputDStreamSuite")
123+
conf.set(WriteAheadLogUtils.RECEIVER_WAL_ENABLE_CONF_KEY, enableWAL.toString)
124+
require(WriteAheadLogUtils.enableReceiverLog(conf) === enableWAL)
125+
val ssc = new StreamingContext(conf, Seconds(1))
126+
val receiverStream = new ReceiverInputDStream[Int](ssc) {
127+
override def getReceiver(): Receiver[Int] = null
128+
}
129+
withStreamingContext(ssc) { ssc =>
130+
body(receiverStream)
131+
}
132+
}
133+
134+
private def createBlockInfo(
135+
withWALInfo: Boolean,
136+
createBlock: Boolean = true): ReceivedBlockInfo = {
137+
val blockId = new StreamBlockId(0, Random.nextLong())
138+
if (createBlock) {
139+
SparkEnv.get.blockManager.putSingle(blockId, 1, StorageLevel.MEMORY_ONLY, tellMaster = true)
140+
require(SparkEnv.get.blockManager.master.contains(blockId))
141+
}
142+
val storeResult = if (withWALInfo) {
143+
new WriteAheadLogBasedStoreResult(blockId, None, new WriteAheadLogRecordHandle { })
144+
} else {
145+
new BlockManagerBasedStoreResult(blockId, None)
146+
}
147+
new ReceivedBlockInfo(0, None, None, storeResult)
148+
}
149+
}

0 commit comments

Comments
 (0)