Skip to content

Commit 8135d31

Browse files
committed
Fix flaky test
1 parent a949741 commit 8135d31

File tree

2 files changed

+14
-10
lines changed

2 files changed

+14
-10
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class ReliableKafkaReceiver[
9898

9999
/** Manage the BlockGenerator in receiver itself for better managing block store and offset
100100
* commit */
101-
private lazy val blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
101+
private var blockGenerator: BlockGenerator = null
102102

103103
override def onStop(): Unit = {
104104
if (consumerConnector != null) {
@@ -117,6 +117,8 @@ class ReliableKafkaReceiver[
117117
override def onStart(): Unit = {
118118
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
119119

120+
blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
121+
120122
if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
121123
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
122124
"otherwise we will manually set it to false to turn off auto offset commit in Kafka")

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
139139
.setAppName(framework)
140140
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
141141
var ssc = new StreamingContext(
142-
sparkConf.clone.set("spark.streaming.blockInterval", "4000"),
142+
sparkConf.clone.set("spark.streaming.blockInterval", "10000"),
143143
batchDuration)
144144
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
145145
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
@@ -155,10 +155,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
155155
"auto.offset.reset" -> "smallest")
156156

157157
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
158-
ssc,
159-
kafkaParams,
160-
topics,
161-
StorageLevel.MEMORY_ONLY).foreachRDD(_ => throw new Exception)
158+
ssc,
159+
kafkaParams,
160+
topics,
161+
StorageLevel.MEMORY_ONLY)
162+
.foreachRDD(_ => throw new Exception)
162163
try {
163164
ssc.start()
164165
ssc.awaitTermination(1000)
@@ -175,10 +176,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
175176
// Restart to see if data is consumed from last checkpoint.
176177
ssc = new StreamingContext(sparkConf, batchDuration)
177178
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
178-
ssc,
179-
kafkaParams,
180-
topics,
181-
StorageLevel.MEMORY_ONLY).foreachRDD(_ => Unit)
179+
ssc,
180+
kafkaParams,
181+
topics,
182+
StorageLevel.MEMORY_ONLY)
183+
.foreachRDD(_ => Unit)
182184
ssc.start()
183185
ssc.awaitTermination(3000)
184186
ssc.stop()

0 commit comments

Comments
 (0)